[jira] [Created] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores

2020-07-09 Thread Gordon Wang (Jira)
Gordon Wang created KAFKA-10252:
---

 Summary: MeteredTimestampedKeyValueStore not setting up correct 
topic name for GlobalKTable associating StateStores
 Key: KAFKA-10252
 URL: https://issues.apache.org/jira/browse/KAFKA-10252
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.1
Reporter: Gordon Wang


When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
{code}
GlobalKTable globalTable = 
streamsBuilder.globalTable(topic,
Consumed.with(keySerde, valueSerde),
Materialized.as(Stores.inMemoryKeyValueStore(topic)));
{code}
I got StreamsException like below:
{code}
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro 
schema for topic applicationId-sourceTopicName-changelog
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Subject not found.; error code: 40401
{code}
But as seen in GlobalKTable Java Doc the changelog stream shall not be created 
and in fact was not created.This leads to our custom serde to be searching for 
schema (we are using Confluent Platform and Avro based schema registry for the 
job) using a wrong topic name (should just be sourceTopicName rather than 
applicationId-sourceTopicName-changelog).
After digging into the code, I found *initStoreSerde* method in 
*MeteredTimestampedKeyValueStore* would assume the topic backing the store 
would always be storeChangelogTopic when initializing the Serdes for the state 
store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl 
ProcessorContext) we shall use the original topic name directly here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac opened a new pull request #8998: MINOR; KafkaAdminClient#describeLogDirs should not fail all the futures when only one call fails

2020-07-09 Thread GitBox


dajac opened a new pull request #8998:
URL: https://github.com/apache/kafka/pull/8998


   Following https://github.com/apache/kafka/pull/8985, I have found another 
case which incorrectly realised all futures when a failure occurs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8998: MINOR; KafkaAdminClient#describeLogDirs should not fail all the futures when only one call fails

2020-07-09 Thread GitBox


dajac commented on pull request #8998:
URL: https://github.com/apache/kafka/pull/8998#issuecomment-655961172


   cc @cmccabe @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores

2020-07-09 Thread Gordon Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154293#comment-17154293
 ] 

Gordon Wang commented on KAFKA-10252:
-

Updated patch for fix.

> MeteredTimestampedKeyValueStore not setting up correct topic name for 
> GlobalKTable associating StateStores
> --
>
> Key: KAFKA-10252
> URL: https://issues.apache.org/jira/browse/KAFKA-10252
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Gordon Wang
>Priority: Major
> Attachments: 
> 0001-KAFKA-10252-Use-source-topic-name-for-MeteredTimesta.patch
>
>
> When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
> {code}
> GlobalKTable globalTable = 
> streamsBuilder.globalTable(topic,
> Consumed.with(keySerde, valueSerde),
> Materialized.as(Stores.inMemoryKeyValueStore(topic)));
> {code}
> I got StreamsException like below:
> {code}
> org.apache.kafka.common.errors.SerializationException: Error retrieving Avro 
> schema for topic applicationId-sourceTopicName-changelog
> Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> Subject not found.; error code: 40401
> {code}
> But as seen in GlobalKTable Java Doc the changelog stream shall not be 
> created and in fact was not created.This leads to our custom serde to be 
> searching for schema (we are using Confluent Platform and Avro based schema 
> registry for the job) using a wrong topic name (should just be 
> sourceTopicName rather than applicationId-sourceTopicName-changelog).
> After digging into the code, I found *initStoreSerde* method in 
> *MeteredTimestampedKeyValueStore* would assume the topic backing the store 
> would always be storeChangelogTopic when initializing the Serdes for the 
> state store, I think for GlobalKTables (ones having a 
> GlobalProcessorContextImpl ProcessorContext) we shall use the original topic 
> name directly here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac opened a new pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac opened a new pull request #8999:
URL: https://github.com/apache/kafka/pull/8999


   As pointed out by @ijuma in 
https://github.com/apache/kafka/pull/8990#discussion_r451059876, using a `Set` 
is not necessary as the caller only cares about having the list of timed out 
connections/nodes. It does nothing else but iterating over the list.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


rajinisivaram commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452040424



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   @dajac Won't this create a list regardless of whether there are timed 
out entries? Let's see whether this addresses @ijuma 's concern on the other PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452052951



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   @rajinisivaram Yes, it does as an empty list is returned if there are no 
timed out entries. This was true with the Set as well btw.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-09 Thread GitBox


omkreddy commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-656000189


   @tombentley Thanks for the explanation. I will go ahead and merge the PR, 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-09 Thread GitBox


omkreddy closed pull request #8966:
URL: https://github.com/apache/kafka/pull/8966


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-09 Thread GitBox


showuon commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-656008310


   @omkreddy , sorry, I saw the PR is closed, instead of merging. Could you 
please check again. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10253) Kafka Connect gets into a rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)
Konstantin Lalafaryan created KAFKA-10253:
-

 Summary: Kafka Connect gets into a rebalance loop
 Key: KAFKA-10253
 URL: https://issues.apache.org/jira/browse/KAFKA-10253
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.5.0
Reporter: Konstantin Lalafaryan


Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into a 
rebalance loop.

 
{code:java}
// code placeholder
{code}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655831 with protocol version 2 and got assignment: Assignment\{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655832 with protocol version 2 and got assignment: Assignment\{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655833 with protocol version 2 and got assignment: Assignment\{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Work

[jira] [Updated] (KAFKA-10253) Kafka Connect gets into a rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Description: 
Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into a 
rebalance loop.

 
{code:java}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655831 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655832 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655833 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:2

[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Description: 
Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into 
an infinite rebalance loop.

 
{code:java}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655831 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655832 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655833 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-

[jira] [Updated] (KAFKA-10253) Kafka Connect gets into a rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Description: 
Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into 
an infinite rebalance loop.

 
{code:java}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655831 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655832 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Was selected to perform 
assignments, but do not have latest config found in sync request. Returning an 
empty configuration to trigger re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Joined group at generation 
305655833 with protocol version 2 and got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
clientId=connect-1, groupId= kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1] 2020-07-

[GitHub] [kafka] omkreddy edited a comment on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-09 Thread GitBox


omkreddy edited a comment on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-656014523


   @showuon  I normally use `kafka-merge-pr.py` script for merging PRs. This 
will do a direct push from local. In this case, github marks it as closed. You 
can check the commit on trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-09 Thread GitBox


omkreddy commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-656014523


   @showuon  I normally use `kafka-merge-pr.py` script for merging PRs. This 
will do a direct push from local. In this case, github marks it as closed. You 
can check the commit of trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Description: 
Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into 
an infinite rebalance loop.

 
{code:java}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655831 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655832 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655833 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 

[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Summary: Kafka Connect gets into an infinite rebalance loop  (was: Kafka 
Connect gets into a rebalance loop)

> Kafka Connect gets into an infinite rebalance loop
> --
>
> Key: KAFKA-10253
> URL: https://issues.apache.org/jira/browse/KAFKA-10253
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0
>Reporter: Konstantin Lalafaryan
>Priority: Blocker
>
> Hello everyone!
>  
> We are running kafka-connect cluster  (3 workers) and very often it gets into 
> an infinite rebalance loop.
>  
> {code:java}
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Was selected to perform 
> assignments, but do not have latest config found in sync request. Returning 
> an empty configuration to trigger re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
> generation 305655831 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Joined group at generation 
> 305655831 with protocol version 2 and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Was selected to perform 
> assignments, but do not have latest config found in sync request. Returning 
> an empty configuration to trigger re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
> generation 305655832 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Joined group at generation 
> 305655832 with protocol version 2 and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Was selected to perform 
> assignments, but do not have latest config found in sync request. Returning 
> an empty configuration to trigger re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Successfully joined group with 
> generation 305655833 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker 
> clientId=connect-1, groupId= kafka-connect] Joined gro

[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Component/s: KafkaConnect

> Kafka Connect gets into an infinite rebalance loop
> --
>
> Key: KAFKA-10253
> URL: https://issues.apache.org/jira/browse/KAFKA-10253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Konstantin Lalafaryan
>Priority: Blocker
>
> Hello everyone!
>  
> We are running kafka-connect cluster  (3 workers) and very often it gets into 
> an infinite rebalance loop.
>  
> {code:java}
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655831 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655831 with protocol version 2 
> and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655832 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655832 with protocol version 2 
> and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655833 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655833 wi

[GitHub] [kafka] showuon commented on pull request #8966: KAFKA-10220: add null check for configurationKey

2020-07-09 Thread GitBox


showuon commented on pull request #8966:
URL: https://github.com/apache/kafka/pull/8966#issuecomment-656019261


   Cool! Thanks, @omkreddy !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #8808: KAFKA-10109: Fix double AdminClient creation in AclCommand

2020-07-09 Thread GitBox


omkreddy closed pull request #8808:
URL: https://github.com/apache/kafka/pull/8808


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10109) kafka-acls.sh/AclCommand opens multiple AdminClients

2020-07-09 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-10109.
---
Fix Version/s: 2.7.0
   Resolution: Fixed

Issue resolved by pull request 8808
[https://github.com/apache/kafka/pull/8808]

> kafka-acls.sh/AclCommand opens multiple AdminClients
> 
>
> Key: KAFKA-10109
> URL: https://issues.apache.org/jira/browse/KAFKA-10109
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
> Fix For: 2.7.0
>
>
> {{AclCommand.AclCommandService}} uses {{withAdminClient(opts: 
> AclCommandOptions)(f: Admin => Unit)}} to abstract the execution of an action 
> using an {{AdminClient}} instance. Unfortunately the use of this method in 
> implemeting {{addAcls()}} and {{removeAcls()}} calls {{listAcls()}}. This 
> causes the creation of a second {{AdminClient}} instance. When the 
> {{--command-config}} option has been used to specify a {{client.id}} for the 
> Admin client, the second instance  fails to register an MBean, resulting in a 
> warning being logged.
> {code}
> ./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config 
> config/broker_connection.conf.reproducing --add --allow-principal User:alice 
> --operation Describe --topic 'test' --resource-pattern-type prefixed
> Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, 
> patternType=PREFIXED)`: 
>   (principal=User:alice, host=*, operation=DESCRIBE, 
> permissionType=ALLOW) 
> [2020-06-03 18:43:12,190] WARN Error registering AppInfo mbean 
> (org.apache.kafka.common.utils.AppInfoParser)
> javax.management.InstanceAlreadyExistsException: 
> kafka.admin.client:type=app-info,id=administrator_data
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
>   at org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
>   at 
> org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
>   at 
> kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:105)
>   at 
> kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:146)
>   at 
> kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1(AclCommand.scala:123)
>   at 
> kafka.admin.AclCommand$AdminClientService.$anonfun$addAcls$1$adapted(AclCommand.scala:116)
>   at 
> kafka.admin.AclCommand$AdminClientService.withAdminClient(AclCommand.scala:108)
>   at 
> kafka.admin.AclCommand$AdminClientService.addAcls(AclCommand.scala:116)
>   at kafka.admin.AclCommand$.main(AclCommand.scala:78)
>   at kafka.admin.AclCommand.main(AclCommand.scala)
> Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, 
> patternType=PREFIXED)`: 
>   (principal=User:alice, host=*, operation=DESCRIBE, permissionType=ALLOW)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


omkreddy commented on a change in pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#discussion_r452092180



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
   @Test
   def testProducerWithAuthenticationFailure(): Unit = {
 val producer = createProducer()
-verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
-verifyAuthenticationException(producer.partitionsFor(topic))
+try {
+  verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
+  verifyAuthenticationException(producer.partitionsFor(topic))
 
-createClientCredential()
-verifyWithRetry(sendOneRecord(producer))
+  createClientCredential()
+  verifyWithRetry(sendOneRecord(producer))
+} finally producer.close()

Review comment:
   @akatona84 Thanks for the PR.  Normally producer/consumer instances 
created in the test are closed in [IntegrationTestHarness.tearDown 
](https://github.com/confluentinc/ce-kafka/blob/master/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala#L152).
 many core tests extends `IntegrationTestHarness` class.
   Do we know, which tests were failed? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10252) MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores

2020-07-09 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154406#comment-17154406
 ] 

Bruno Cadonna commented on KAFKA-10252:
---

Thank you for the bug report, [~bulbfreeman].

I think this is a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-10179. There is also an open PR for 
fixing the bug linked in that ticket. 

> MeteredTimestampedKeyValueStore not setting up correct topic name for 
> GlobalKTable associating StateStores
> --
>
> Key: KAFKA-10252
> URL: https://issues.apache.org/jira/browse/KAFKA-10252
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Gordon Wang
>Priority: Major
> Attachments: 
> 0001-KAFKA-10252-Use-source-topic-name-for-MeteredTimesta.patch
>
>
> When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
> {code}
> GlobalKTable globalTable = 
> streamsBuilder.globalTable(topic,
> Consumed.with(keySerde, valueSerde),
> Materialized.as(Stores.inMemoryKeyValueStore(topic)));
> {code}
> I got StreamsException like below:
> {code}
> org.apache.kafka.common.errors.SerializationException: Error retrieving Avro 
> schema for topic applicationId-sourceTopicName-changelog
> Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> Subject not found.; error code: 40401
> {code}
> But as seen in GlobalKTable Java Doc the changelog stream shall not be 
> created and in fact was not created.This leads to our custom serde to be 
> searching for schema (we are using Confluent Platform and Avro based schema 
> registry for the job) using a wrong topic name (should just be 
> sourceTopicName rather than applicationId-sourceTopicName-changelog).
> After digging into the code, I found *initStoreSerde* method in 
> *MeteredTimestampedKeyValueStore* would assume the topic backing the store 
> would always be storeChangelogTopic when initializing the Serdes for the 
> state store, I think for GlobalKTables (ones having a 
> GlobalProcessorContextImpl ProcessorContext) we shall use the original topic 
> name directly here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soarez opened a new pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-07-09 Thread GitBox


soarez opened a new pull request #9000:
URL: https://github.com/apache/kafka/pull/9000


   Following up on #8752 which seems to have gone stale.
   
   @mjsax can you continue the review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


akatona84 commented on a change in pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#discussion_r452108786



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
   @Test
   def testProducerWithAuthenticationFailure(): Unit = {
 val producer = createProducer()
-verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
-verifyAuthenticationException(producer.partitionsFor(topic))
+try {
+  verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
+  verifyAuthenticationException(producer.partitionsFor(topic))
 
-createClientCredential()
-verifyWithRetry(sendOneRecord(producer))
+  createClientCredential()
+  verifyWithRetry(sendOneRecord(producer))
+} finally producer.close()

Review comment:
   Oh, right, it was suspicious that I did not see consumer thread on the 
failed .classMethod cases :) Thanks.
testConsumerGroupServiceWithAuthenticationSuccess failed and left 
adminclient threads for later ones:
   ```
   java.lang.AssertionError: Found unexpected threads during @AfterClass, 
allThreads=Set(Test worker, metrics-meter-tick-thread-2, main, Signal 
Dispatcher, Reference Handler, Finalizer, /0:0:0:0:0:0:0:1:55620 to 
/0:0:0:0:0:0:0:1:33045 workers Thread 2, kafka-admin-client-thread | 
adminclient-45, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 
3, metrics-meter-tick-thread-1), unexpected=Set(kafka-admin-client-thread | 
adminclient-45)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


akatona84 commented on a change in pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#discussion_r452108786



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
   @Test
   def testProducerWithAuthenticationFailure(): Unit = {
 val producer = createProducer()
-verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
-verifyAuthenticationException(producer.partitionsFor(topic))
+try {
+  verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
+  verifyAuthenticationException(producer.partitionsFor(topic))
 
-createClientCredential()
-verifyWithRetry(sendOneRecord(producer))
+  createClientCredential()
+  verifyWithRetry(sendOneRecord(producer))
+} finally producer.close()

Review comment:
   Oh, right, it was suspicious that I did not see consumer thread on the 
failed .classMethod cases :) Thanks.
testConsumerGroupServiceWithAuthenticationSuccess failed and left 
adminclient threads for later ones:
   
   > java.lang.AssertionError: Found unexpected threads during @AfterClass, 
allThreads=Set(Test worker, metrics-meter-tick-thread-2, main, Signal 
Dispatcher, Reference Handler, Finalizer, /0:0:0:0:0:0:0:1:55620 to 
/0:0:0:0:0:0:0:1:33045 workers Thread 2, kafka-admin-client-thread | 
adminclient-45, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 
3, metrics-meter-tick-thread-1), unexpected=Set(kafka-admin-client-thread | 
adminclient-45)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


akatona84 commented on pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#issuecomment-656038173


   Thanks, @omkreddy for explaining how the consumers and prods are closed, 
updated my PR so only closing things where it is necessary.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 edited a comment on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


akatona84 edited a comment on pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#issuecomment-656038173


   Thanks, @omkreddy for explaining how the consumers and prods are closed 
(I've missed that), updated my PR so only closing things where it is necessary.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on a change in pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


akatona84 commented on a change in pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#discussion_r452108786



##
File path: 
core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
##
@@ -74,48 +75,58 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
   @Test
   def testProducerWithAuthenticationFailure(): Unit = {
 val producer = createProducer()
-verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
-verifyAuthenticationException(producer.partitionsFor(topic))
+try {
+  verifyAuthenticationException(sendOneRecord(producer, maxWaitMs = 1))
+  verifyAuthenticationException(producer.partitionsFor(topic))
 
-createClientCredential()
-verifyWithRetry(sendOneRecord(producer))
+  createClientCredential()
+  verifyWithRetry(sendOneRecord(producer))
+} finally producer.close()

Review comment:
   Oh, right, it was suspicious that I did not see consumer thread on the 
failed .classMethod cases :) Thanks.
testConsumerGroupServiceWithAuthenticationSuccess failed and left 
adminclient threads for later ones:
   
   > java.lang.AssertionError: Found unexpected threads during `@AfterClass`, 
allThreads=Set(Test worker, metrics-meter-tick-thread-2, main, Signal 
Dispatcher, Reference Handler, Finalizer, /0:0:0:0:0:0:0:1:55620 to 
/0:0:0:0:0:0:0:1:33045 workers Thread 2, kafka-admin-client-thread | 
adminclient-45, /0:0:0:0:0:0:0:1:55620 to /0:0:0:0:0:0:0:1:33045 workers Thread 
3, metrics-meter-tick-thread-1), unexpected=Set(kafka-admin-client-thread | 
adminclient-45)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


omkreddy commented on pull request #8992:
URL: https://github.com/apache/kafka/pull/8992#issuecomment-656045969


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452119998



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,24 +45,23 @@
 protected ProcessorNode currentNode;
 private long currentSystemTimeMs;
 
-final StateManager stateManager;

Review comment:
   This is basically to avoid the cast from `StateManager` to 
`ProcessorStateManager` in `ProcessorContextImpl`. `ProcessorStateManager` has 
method `registeredChangelogPartitionFor()` that does only exist in the 
`ProessorStateManager`, because it is only used in `ProcessorContextImpl`. 
Hence, I moved the state manager field to the respective children and 
introduced the abstract method `stateManager()` in `AbstractProcessorContext`. 
Method `stateManager()` returns a `ProcessorStateManager` in 
`ProcessorContextImpl` but a `StateManager` in all other places.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #9001: KAFKA-10028: Implement KIP-584 write path

2020-07-09 Thread GitBox


kowshik opened a new pull request #9001:
URL: https://github.com/apache/kafka/pull/9001


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10028) Implement write path for feature versioning scheme

2020-07-09 Thread Kowshik Prakasam (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam reassigned KAFKA-10028:


Assignee: Kowshik Prakasam

> Implement write path for feature versioning scheme
> --
>
> Key: KAFKA-10028
> URL: https://issues.apache.org/jira/browse/KAFKA-10028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
>
> Goal is to implement various classes and integration for the write path of 
> the feature versioning system 
> ([KIP-584|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features]).
>  This is preceded by the read path implementation (KAFKA-10027). The write 
> path implementation involves developing the new controller API: 
> UpdateFeatures that enables transactional application of a set of 
> cluster-wide feature updates to the ZK {{'/features'}} node, along with 
> required ACL permissions.
>  
> Details about the write path are explained [in this 
> part|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP-584:Versioningschemeforfeatures-ChangestoKafkaController]
>  of the KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy merged pull request #3480: MINOR: Define the term tombstone, since it's used elsewhere in the docs

2020-07-09 Thread GitBox


omkreddy merged pull request #3480:
URL: https://github.com/apache/kafka/pull/3480


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-07-09 Thread GitBox


mimaison commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-656062630


   @omkreddy @ijuma What's your opinion on this change? Currently a number of 
config links collide and this is addressing it. However this will also break 
existing links.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-09 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r452140714



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
 val version = request.header.apiVersion
 
-val mergedResponseMap = if (version == 0)
+val topics = if (version == 0)
   handleListOffsetRequestV0(request)
 else
   handleListOffsetRequestV1AndAbove(request)
 
-sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+  .setThrottleTimeMs(requestThrottleMs)
+  .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
 val correlationId = request.header.correlationId
 val clientId = request.header.clientId
 val offsetRequest = request.body[ListOffsetRequest]
 
-val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  try {
-val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-  topicPartition = topicPartition,
-  timestamp = partitionData.timestamp,
-  maxNumOffsets = partitionData.maxNumOffsets,
-  isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-  fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-  } catch {
-// NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-// are typically transient and there is no value in logging the entire 
stack trace for the same
-case e @ (_ : UnknownTopicOrPartitionException |
-  _ : NotLeaderForPartitionException |
-  _ : KafkaStorageException) =>
-  debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-correlationId, clientId, topicPartition, e.getMessage))
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-case e: Throwable =>
-  error("Error while responding to offset request", e)
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-  }
-}
-responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-val correlationId = request.header.correlationId
-val clientId = request.header.clientId
-val offsetRequest = request.body[ListOffsetRequest]
-
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-ListOffsetResponse.UNKNOWN_OFFSET,
-Optional.empty())
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-s"failed because the partition is duplicated in the request.")
-(topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-  ListOffsetResponse.UNKNOWN_TIMESTAMP,
-  ListOffsetResponse.UNKNOWN_OFFSET,
-  Optional.empty()))
-  } else {
-
-def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-  (topicPartition, new ListOffsetResponse.PartitionData(
-e,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-   

[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452119998



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,24 +45,23 @@
 protected ProcessorNode currentNode;
 private long currentSystemTimeMs;
 
-final StateManager stateManager;

Review comment:
   This is basically to avoid the explicit cast from `StateManager` to 
`ProcessorStateManager` in `ProcessorContextImpl`. `ProcessorStateManager` has 
method `registeredChangelogPartitionFor()` that does only exist in the 
`ProessorStateManager`, because it is only used in `ProcessorContextImpl`. 
Hence, I moved the state manager field to the respective children and 
introduced the abstract method `stateManager()` in `AbstractProcessorContext`. 
Method `stateManager()` returns a `ProcessorStateManager` in 
`ProcessorContextImpl` but a `StateManager` in all other places.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452148874



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final String storeName = name();
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+ changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   I think we still need the if-then-else. 
`ProcessorContextUtils.changelogFor()` checks that the processor context is of 
type `InternalProcessorContext` to be able to call `changelogFor()` on it. It 
does not consider the case where no changelog exists (i.e., `changelogFor()` 
returns `null`). The `null` check is done here. Fallback is in both cases 
`ProcessorStateManager.storeChangelogTopic()`.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect

2020-07-09 Thread xiaotong.wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaotong.wang updated KAFKA-10254:
--
Description: 
steps

1、start kafka broker 

2、start kafka consumer and subscribe some topic with some kafkaConsumer 
instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   and 
set auto.commit.enabled=false

3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers

4、cpu go to 100%

 

*why?*

 

 

left Vserison :2.3.1

right Version:2.5.0

 

for 2.3.1 kafkaConsumer when kafka  brokers go  
down,updateAssignmentMetadataIfNeeded will block x ms and return empty records ,

!image-2020-07-09-19-24-20-604.png|width=993,height=176!

 

for 2.5.0

private Map>> pollForFetches(Timer 
timer) {
 *long pollTimeout = coordinator == null ? timer.remainingMs() :*
 *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());*

i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
heartbeat timeout ,so  it will call poll without any block ,this will cause cpu 
go to 100%

 

 

 

  was:
steps

1、start kafka broker 

2、start kafka consumer and subscribe some topic with some kafkaConsumer 
instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   and 
set auto.commit.enabled=false

3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers

4、cpu go to 100%

 

*why?*

 

 

left Vserison :2.3.1

right Version:2.5.0

 

for 2.3.1 kafkaConsumer when kafka  brokers go  
down,updateAssignmentMetadataIfNeeded will block x ms and return empty records ,

!image-2020-07-09-19-24-20-604.png!

 

for 2.5.0

private Map>> pollForFetches(Timer 
timer) {
 *long pollTimeout = coordinator == null ? timer.remainingMs() :*
 *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());*



i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
heartbeat timeout ,so  it will call poll without any block ,this will cause cpu 
go to 100%

 

 

 


> 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect 
> 
>
> Key: KAFKA-10254
> URL: https://issues.apache.org/jira/browse/KAFKA-10254
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: xiaotong.wang
>Priority: Critical
> Attachments: image-2020-07-09-19-24-20-604.png
>
>
> steps
> 1、start kafka broker 
> 2、start kafka consumer and subscribe some topic with some kafkaConsumer 
> instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   
> and set auto.commit.enabled=false
> 3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers
> 4、cpu go to 100%
>  
> *why?*
>  
>  
> left Vserison :2.3.1
> right Version:2.5.0
>  
> for 2.3.1 kafkaConsumer when kafka  brokers go  
> down,updateAssignmentMetadataIfNeeded will block x ms and return empty 
> records ,
> !image-2020-07-09-19-24-20-604.png|width=993,height=176!
>  
> for 2.5.0
> private Map>> pollForFetches(Timer 
> timer) {
>  *long pollTimeout = coordinator == null ? timer.remainingMs() :*
>  *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
> timer.remainingMs());*
> i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
> heartbeat timeout ,so  it will call poll without any block ,this will cause 
> cpu go to 100%
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect

2020-07-09 Thread xiaotong.wang (Jira)
xiaotong.wang created KAFKA-10254:
-

 Summary: 100% cpu usage by kafkaConsumer poll , when broker can‘t 
be connect 
 Key: KAFKA-10254
 URL: https://issues.apache.org/jira/browse/KAFKA-10254
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.5.0
Reporter: xiaotong.wang
 Attachments: image-2020-07-09-19-24-20-604.png

steps

1、start kafka broker 

2、start kafka consumer and subscribe some topic with some kafkaConsumer 
instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   and 
set auto.commit.enabled=false

3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers

4、cpu go to 100%

 

*why?*

 

 

left Vserison :2.3.1

right Version:2.5.0

 

for 2.3.1 kafkaConsumer when kafka  brokers go  
down,updateAssignmentMetadataIfNeeded will block x ms and return empty records ,

!image-2020-07-09-19-24-20-604.png!

 

for 2.5.0

private Map>> pollForFetches(Timer 
timer) {
 *long pollTimeout = coordinator == null ? timer.remainingMs() :*
 *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());*



i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
heartbeat timeout ,so  it will call poll without any block ,this will cause cpu 
go to 100%

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect

2020-07-09 Thread xiaotong.wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaotong.wang updated KAFKA-10254:
--
Description: 
steps

1、start kafka broker 

2、start kafka consumer and subscribe some topic with some kafkaConsumer 
instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   and 
set auto.commit.enabled=false

3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers

4、cpu go to 100%

 

*why?*

 

 

left Vserison :2.3.1

right Version:2.5.0

 

for 2.3.1 kafkaConsumer when kafka  brokers go  
down,updateAssignmentMetadataIfNeeded will block x ms and return empty records ,

!image-2020-07-09-19-24-20-604.png|width=926,height=164!

 

for 2.5.0

private Map>> pollForFetches(Timer 
timer) {
 *long pollTimeout = coordinator == null ? timer.remainingMs() :*
 *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());*

i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
heartbeat timeout ,so  it will call poll without any block ,this will cause cpu 
go to 100%

 

 

 

  was:
steps

1、start kafka broker 

2、start kafka consumer and subscribe some topic with some kafkaConsumer 
instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   and 
set auto.commit.enabled=false

3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers

4、cpu go to 100%

 

*why?*

 

 

left Vserison :2.3.1

right Version:2.5.0

 

for 2.3.1 kafkaConsumer when kafka  brokers go  
down,updateAssignmentMetadataIfNeeded will block x ms and return empty records ,

!image-2020-07-09-19-24-20-604.png|width=993,height=176!

 

for 2.5.0

private Map>> pollForFetches(Timer 
timer) {
 *long pollTimeout = coordinator == null ? timer.remainingMs() :*
 *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());*

i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
heartbeat timeout ,so  it will call poll without any block ,this will cause cpu 
go to 100%

 

 

 


> 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect 
> 
>
> Key: KAFKA-10254
> URL: https://issues.apache.org/jira/browse/KAFKA-10254
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: xiaotong.wang
>Priority: Critical
> Attachments: image-2020-07-09-19-24-20-604.png
>
>
> steps
> 1、start kafka broker 
> 2、start kafka consumer and subscribe some topic with some kafkaConsumer 
> instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   
> and set auto.commit.enabled=false
> 3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers
> 4、cpu go to 100%
>  
> *why?*
>  
>  
> left Vserison :2.3.1
> right Version:2.5.0
>  
> for 2.3.1 kafkaConsumer when kafka  brokers go  
> down,updateAssignmentMetadataIfNeeded will block x ms and return empty 
> records ,
> !image-2020-07-09-19-24-20-604.png|width=926,height=164!
>  
> for 2.5.0
> private Map>> pollForFetches(Timer 
> timer) {
>  *long pollTimeout = coordinator == null ? timer.remainingMs() :*
>  *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
> timer.remainingMs());*
> i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
> heartbeat timeout ,so  it will call poll without any block ,this will cause 
> cpu go to 100%
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r452162828



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+import java.util.List;
+
+/**
+ * A {@link SampledStat} that mimics the behavior of a Token Bucket and is 
meant to
+ * be used in conjunction with a {@link Rate} and a {@link 
org.apache.kafka.common.metrics.Quota}.
+ *
+ * The {@link TokenBucket} considers each sample as the amount of credits 
spent during the sample's
+ * window while giving back credits based on the defined quota.
+ *
+ * At time T, it computes the total O as follow:
+ * - O(T) = max(0, O(T-1) - Q * (W(T) - W(T-1)) + S(T)
+ * Where:
+ * - Q is the defined Quota or 0 if undefined
+ * - W is the time of the sample or now if undefined
+ * - S is the value of the sample or 0 if undefined
+ *
+ * Example with 3 samples with a Quota = 2:
+ * - S1 at T+0s => 4
+ * - S2 at T+2s => 2
+ * - S3 at T+4s => 6
+ *
+ * The total at T+6s is computed as follow:
+ * - T0 => Total at T+0s => S1 = 4
+ * - T1 => Total at T+2s => max(0, T0 - Q * dT) + S2 = (4 - 2 * 2) + 2 = 2
+ * - T2 => Total at T+4s => max(0, T1 - Q * dT) + S3 = (2 - 2 * 2) + 6 = 6
+ * - T3 => Total at T+6s => max(0, T2 - Q * dT) = (6 - 2 * 2) = 2
+ */
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+protected void update(Sample sample, MetricConfig config, double value, 
long now) {
+sample.value += value;
+}
+
+@Override

Review comment:
   My first implementation was doing what you described: backfilling past 
samples up to quota * sample length and putting the remaining in the last (or 
current) sample. You seems to use sample in singular quite often in your 
description so I wonder if you also meant to backfill all the past samples. Did 
you?
   
   It turned out that backfilling past samples is not as strait-forward as it 
seems because, as you pointed out in your other comment, we may have holes or 
no past samples at all. Let me take few examples to illustrate.
   
   Let's assume that we have the following settings:
   - samples = 6
   - window = 1s
   - quota = 5
   
   Example 1:
   Let's assume that we record 2 at T and 30 at T+5. When 2 is recorded, a 
first sample at T is created with the value 2. Then, we record 30 at T+5. As it 
is above the quota of the current sample, we want to backfill T, T+1, T+2, T+3, 
T+4 and put the remaining in T+5. This mean, that we have to create samples for 
T+1, T+2, T+3 and T+4 because they don't exist in memory. We would end up with 
the following samples in memory:
   T = 5, T+1 = 5, T+2 = 5, T+3 = 5, T+4 = 5, T+5 = 7.
   
   Example 2:
   Let's assume that we record 2 at T, 3 at T+3 and 30 at T+5. The mechanics is 
similar to the previous example in the difference that it requires to add new 
samples between existing past samples: T+1 and T+2; as well as adding sample 
T+4. We would end up with the following samples in memory:
   T = 5, T+1 = 5, T+2 = 5, T+3 = 5, T+4 = 5, T+5 = 11.
   
   With the current base implementation of the SampleStat, adding all the past 
samples, especially in between existing past samples is quite complex and 
possibly error-prone. The simplest would be to always allocate all the samples 
but this is something I would like to avoid.
   
   Following this observation, I started to look at the problem from a 
different angle. Instead of spreading value at record time, I thought that we 
could adjust them while combining the samples. The idea is to run a token 
bucket algorithm in the combine method based on all the non-expired samples.
   
   My implementation is basically a reversed Token Bucket, a token bucket which 
start at zero instead of stating at the maximum number of tokens.

[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r452162969



##
File path: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
##
@@ -97,7 +97,25 @@ public static RecordingLevel forName(String name) {
 public boolean shouldRecord(final int configId) {
 return configId == DEBUG.id || configId == this.id;
 }
+}
 
+public enum QuotaEnforcementType {
+/**
+ * The quota is not enforced.
+ */
+NONE,
+
+/**
+ * The quota is enforced before updating the sensor. An update
+ * is rejected if the quota is already exhausted.

Review comment:
   That is indeed much better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452165061



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -73,22 +73,23 @@
 private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
 private final OffsetCheckpoint checkpointFile;
 private final Map checkpointFileCache;
+private final Map storeToChangelogTopic;
 
 public GlobalStateManagerImpl(final LogContext logContext,
   final ProcessorTopology topology,
   final Consumer 
globalConsumer,
   final StateDirectory stateDirectory,
   final StateRestoreListener 
stateRestoreListener,
   final StreamsConfig config) {
+storeToChangelogTopic = topology.storeToChangelogTopic();

Review comment:
   I would still pass `ProcessorTopology` into the constructor because it 
might make the signature of the constructor more stable. I removed the field 
for the topology and now we store only `globalStateStores` and 
`storeChangelogTopics` in `GlobalStateManagerImpl`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8977: KAFKA-10162; Quota Enhancements (KIP-599)

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r452166182



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+import java.util.List;
+
+/**
+ * A {@link SampledStat} that mimics the behavior of a Token Bucket and is 
meant to
+ * be used in conjunction with a {@link Rate} and a {@link 
org.apache.kafka.common.metrics.Quota}.
+ *
+ * The {@link TokenBucket} considers each sample as the amount of credits 
spent during the sample's
+ * window while giving back credits based on the defined quota.
+ *
+ * At time T, it computes the total O as follow:
+ * - O(T) = max(0, O(T-1) - Q * (W(T) - W(T-1)) + S(T)
+ * Where:
+ * - Q is the defined Quota or 0 if undefined
+ * - W is the time of the sample or now if undefined
+ * - S is the value of the sample or 0 if undefined
+ *
+ * Example with 3 samples with a Quota = 2:
+ * - S1 at T+0s => 4
+ * - S2 at T+2s => 2
+ * - S3 at T+4s => 6
+ *
+ * The total at T+6s is computed as follow:
+ * - T0 => Total at T+0s => S1 = 4
+ * - T1 => Total at T+2s => max(0, T0 - Q * dT) + S2 = (4 - 2 * 2) + 2 = 2
+ * - T2 => Total at T+4s => max(0, T1 - Q * dT) + S3 = (2 - 2 * 2) + 6 = 6
+ * - T3 => Total at T+6s => max(0, T2 - Q * dT) = (6 - 2 * 2) = 2
+ */
+public class TokenBucket extends SampledStat {
+
+private final TimeUnit unit;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+super(0);
+this.unit = unit;
+}
+
+@Override
+protected void update(Sample sample, MetricConfig config, double value, 
long now) {
+sample.value += value;
+}
+
+@Override
+public double combine(List samples, MetricConfig config, long now) 
{
+if (this.samples.isEmpty())
+return 0;
+
+final double quota = config.quota() != null ? config.quota().bound() : 
0;
+final int startIndex = (this.current + 1) % this.samples.size();
+
+long lastWindowMs = Long.MAX_VALUE;
+double total = 0.0;
+
+for (int i = 0; i < this.samples.size(); i++) {

Review comment:
   1. It does not cause a problem here. As you said, `purgeObsoleteSamples` 
sets the value to zero and the lastWindowMs to now when it resets a sample. 
When the past `lastWindowMs` is higher that the current sample `lastWindowMs` 
or `now`, it has no effect: `Math.max(0, nowMs - lastWindowMs)`.
   
   2. Right. I take current sample's lastWindowMs - previous sample's 
lastWindowMs to take this into account.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-09 Thread Luke Chen (Jira)
Luke Chen created KAFKA-10255:
-

 Summary: Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
 Key: KAFKA-10255
 URL: https://issues.apache.org/jira/browse/KAFKA-10255
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen
Assignee: Luke Chen


https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0

org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testOneWayReplicationWithAutorOffsetSync1 STARTED
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout

org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testOneWayReplicationWithAutorOffsetSync1 FAILED
 java.lang.AssertionError: consumer record size is not zero expected:<0> but 
was:<2>
 at org.junit.Assert.fail(Assert.java:89)
 at org.junit.Assert.failNotEquals(Assert.java:835)
 at org.junit.Assert.assertEquals(Assert.java:647)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy merged pull request #8992: MINOR: Closing resources in SaslClientsWithInvalidCredentialsTest

2020-07-09 Thread GitBox


omkreddy merged pull request #8992:
URL: https://github.com/apache/kafka/pull/8992


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656181031


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656181429


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #9002: MINOR: Add ApiMessageTypeGenerator

2020-07-09 Thread GitBox


cmccabe opened a new pull request #9002:
URL: https://github.com/apache/kafka/pull/9002


   Previously, we had some code hard-coded to generate message type classes
   for RPCs.  We might want to generate message type classes for other
   things as well, so make it more generic.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10256) Create a server gradle module for Java code needed only by servers

2020-07-09 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-10256:


 Summary: Create a server gradle module for Java code needed only 
by servers
 Key: KAFKA-10256
 URL: https://issues.apache.org/jira/browse/KAFKA-10256
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


It doesn't really make sense to have a "server" directory in the "clients" 
gradle module.  The client is not the server.  Instead, we should have a 
separate gradle module for code which is server-specific.

This will avoid polluting the client CLASSPATH with code which is internal to 
the server, and make the functional division of the code clearer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] joel-hamill commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-07-09 Thread GitBox


joel-hamill commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-656205597


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10240) Sink tasks should not throw WakeupException on shutdown

2020-07-09 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-10240:
--
Description: 
* When a task is scheduled for shutdown, the framework [wakes up the 
consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159]
 for that task.

 * As is noted in the [Javadocs for that 
method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348],
 “If no thread is blocking in a method which can throw 
{{org.apache.kafka.common.errors.WakeupException}}, the next call to such a 
method will raise it instead.”

 * It just so happens that, if the framework isn’t in the middle of a call to 
the consumer and then the task gets stopped, the next call the framework will 
make on the consumer may be to commit offsets, which will immediately throw a 
{{WakeupException}}.

 * Currently, the framework handles this by [immediately retrying the offset 
commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339]
 until it either throws a different exception or succeeds, and then throwing 
the original {{WakeupException}}. If this synchronous commit of offsets occurs 
during task shutdown (as opposed to in response to a consumer rebalance), it's 
unnecessary to throw the {{WakeupException}} back to the caller, and can cause 
alarming {{ERROR}}-level messages to get logged by the worker.

  was:
* When a task is scheduled for shutdown, the framework [wakes up the 
consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159]
 for that task.

 * As is noted in the [Javadocs for that 
method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348],
 “If no thread is blocking in a method which can throw 
{{org.apache.kafka.common.errors.WakeupException}}, the next call to such a 
method will raise it instead.”

 * It just so happens that, if the framework isn’t in the middle of a call to 
the consumer and then the task gets stopped, the next call the framework will 
make on the consumer may be to commit offsets, which will immediately throw a 
{{WakeupException}}.

 * Currently, the framework handles this by [immediately retrying the offset 
commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339]
 until it either throws a different exception or succeeds, and then throwing 
the original {{WakeupException}}. Since this synchronous commit of offsets only 
occurs during task shutdown, it's unnecessary to throw the {{WakeupException}} 
back to the caller, and can cause alarming {{ERROR}}-level messages to get 
logged by the worker.


> Sink tasks should not throw WakeupException on shutdown
> ---
>
> Key: KAFKA-10240
> URL: https://issues.apache.org/jira/browse/KAFKA-10240
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 
> 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> * When a task is scheduled for shutdown, the framework [wakes up the 
> consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159]
>  for that task.
>  * As is noted in the [Javadocs for that 
> method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348],
>  “If no thread is blocking in a method which can throw 
> {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a 
> method will raise it instead.”
>  * It just so happens that, if the framework isn’t in the middle of a call to 
> the consumer and then the task gets stopped, the next call the framework will 
> make on the consumer may be to commit offsets, which will immediately throw a 
> {{WakeupException}}.
>  * Currently, the framework handles this by [immediately retrying the offset 
> commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339]
>  until it either throws a

[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-09 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r452324069



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -965,11 +994,11 @@ public void onFailure(RuntimeException e) {
  * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
  */
 private RequestFuture sendListOffsetRequest(final Node 
node,
-  final 
Map timestampsToSearch,
+  final 
Map timestampsToSearch,
   boolean 
requireTimestamp) {
 ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
 .forConsumer(requireTimestamp, isolationLevel)
-.setTargetTimes(timestampsToSearch);
+.setTargetTimes(toListOffsetTopics(timestampsToSearch));

Review comment:
   I initially tried to do that but there's a couple of intermediate 
collections using `TopicPartition` in these methods and it makes it really hard 
to update them. For example:
   - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L735
   - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L880
   - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L887
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8981: KAFKA-10235 Fix flaky transactions_test.py

2020-07-09 Thread GitBox


junrao commented on pull request #8981:
URL: https://github.com/apache/kafka/pull/8981#issuecomment-656214847


   11 failures in system tests. 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-08--001.1594266883--chia7712--KAFKA-10235--a76224fff/report.html
   
   @chia7712 : Did this PR fix transaction tests as expected?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10240) Sink tasks should not throw WakeupException on shutdown

2020-07-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154702#comment-17154702
 ] 

Chris Egerton commented on KAFKA-10240:
---

For some background: this was actually discussed years ago on 
[https://github.com/apache/kafka/pull/1511], but never implemented.

> Sink tasks should not throw WakeupException on shutdown
> ---
>
> Key: KAFKA-10240
> URL: https://issues.apache.org/jira/browse/KAFKA-10240
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 
> 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> * When a task is scheduled for shutdown, the framework [wakes up the 
> consumer|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L159]
>  for that task.
>  * As is noted in the [Javadocs for that 
> method|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2348],
>  “If no thread is blocking in a method which can throw 
> {{org.apache.kafka.common.errors.WakeupException}}, the next call to such a 
> method will raise it instead.”
>  * It just so happens that, if the framework isn’t in the middle of a call to 
> the consumer and then the task gets stopped, the next call the framework will 
> make on the consumer may be to commit offsets, which will immediately throw a 
> {{WakeupException}}.
>  * Currently, the framework handles this by [immediately retrying the offset 
> commit|https://github.com/apache/kafka/blob/8a24da376b801c6eb6522ad4861b83f5beb5826c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L337-L339]
>  until it either throws a different exception or succeeds, and then throwing 
> the original {{WakeupException}}. If this synchronous commit of offsets 
> occurs during task shutdown (as opposed to in response to a consumer 
> rebalance), it's unnecessary to throw the {{WakeupException}} back to the 
> caller, and can cause alarming {{ERROR}}-level messages to get logged by the 
> worker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10257) system test kafkatest.tests.core.security_rolling_upgrade_test fails

2020-07-09 Thread Jun Rao (Jira)
Jun Rao created KAFKA-10257:
---

 Summary: system test 
kafkatest.tests.core.security_rolling_upgrade_test fails
 Key: KAFKA-10257
 URL: https://issues.apache.org/jira/browse/KAFKA-10257
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Jun Rao


The test failure was reported in 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-08--001.1594266883--chia7712--KAFKA-10235--a76224fff/report.html

Saw the following error in the log.

{code:java}
[2020-07-09 00:56:37,575] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: Could not find a 'KafkaServer' or 
'internal.KafkaServer' entry in the JAAS configuration. System property 
'java.security.auth.login.config' is not set
at 
org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
at 
org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
at 
org.apache.kafka.common.security.JaasContext.loadServerContext(JaasContext.java:70)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:131)
at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
at kafka.network.Processor.(SocketServer.scala:780)
at kafka.network.SocketServer.newProcessor(SocketServer.scala:406)
at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:285)
at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:284)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:251)
at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:248)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:248)
at kafka.network.SocketServer.startup(SocketServer.scala:122)
at kafka.server.KafkaServer.startup(KafkaServer.scala:297)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8981: KAFKA-10235 Fix flaky transactions_test.py

2020-07-09 Thread GitBox


chia7712 commented on pull request #8981:
URL: https://github.com/apache/kafka/pull/8981#issuecomment-656223017


   > Did this PR fix transaction tests as expected?
   
   Yep. core.transactions_test passes :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #8981: KAFKA-10235 Fix flaky transactions_test.py

2020-07-09 Thread GitBox


junrao merged pull request #8981:
URL: https://github.com/apache/kafka/pull/8981


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10235) Fix flaky transactions_test.py

2020-07-09 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10235.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

merged the PR to trunk.

> Fix flaky transactions_test.py
> --
>
> Key: KAFKA-10235
> URL: https://issues.apache.org/jira/browse/KAFKA-10235
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.0.0
>
>
> {code}
> =hard_bounce.bounce_target=clients.check_order=False.use_group_metadata=False:
>  FAIL: copier-1 : Message copier didn't make enough progress in 30s. Current 
> progress: 0
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 134, in run
> data = self.run_test()
>   File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 192, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 
> 429, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 254, in test_transactions
> num_messages_to_copy=self.num_seed_messages, 
> use_group_metadata=use_group_metadata)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 195, in copy_messages_transactionally
> self.bounce_copiers(copiers, clean_shutdown)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 120, in bounce_copiers
> % (copier.transactional_id, str(copier.progress_percent(
>   File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line 
> 41, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
> TimeoutError: copier-1 : Message copier didn't make enough progress in 30s. 
> Current progress: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452349346



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   Why are we using `LinkedList`? It's very rare where it should be used.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10254) 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect

2020-07-09 Thread Arvin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154725#comment-17154725
 ] 

Arvin Zheng commented on KAFKA-10254:
-

[~xiaotong.wang], see KAFKA-10134

> 100% cpu usage by kafkaConsumer poll , when broker can‘t be connect 
> 
>
> Key: KAFKA-10254
> URL: https://issues.apache.org/jira/browse/KAFKA-10254
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: xiaotong.wang
>Priority: Critical
> Attachments: image-2020-07-09-19-24-20-604.png
>
>
> steps
> 1、start kafka broker 
> 2、start kafka consumer and subscribe some topic with some kafkaConsumer 
> instance and  call  kafkaConsumer.*poll(Duration.ofMillis(pollTimeout))*   
> and set auto.commit.enabled=false
> 3、iptables to disable kafka broker  ip  in client vm or shutdown kafka brokers
> 4、cpu go to 100%
>  
> *why?*
>  
>  
> left Vserison :2.3.1
> right Version:2.5.0
>  
> for 2.3.1 kafkaConsumer when kafka  brokers go  
> down,updateAssignmentMetadataIfNeeded will block x ms and return empty 
> records ,
> !image-2020-07-09-19-24-20-604.png|width=926,height=164!
>  
> for 2.5.0
> private Map>> pollForFetches(Timer 
> timer) {
>  *long pollTimeout = coordinator == null ? timer.remainingMs() :*
>  *Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
> timer.remainingMs());*
> i check the source of kafka client ,poll timeout will be change to 0 ms ,when 
> heartbeat timeout ,so  it will call poll without any block ,this will cause 
> cpu go to 100%
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452357020



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -215,7 +215,7 @@ public StateStore getGlobalStore(final String name) {
 }
 
 // package-private for test only
-void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
+void initializeStoreOffsetsFromCheckpoint(final boolean taskDirIsEmpty) {

Review comment:
   Was deceptively named.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -566,8 +567,20 @@ public void shouldReviveCorruptTasks() {
 topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
 expectLastCall().anyTimes();
 
+expect(consumer.assignment()).andReturn(taskId00Partitions);
+consumer.pause(taskId00Partitions);
+expectLastCall();
+final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
+
expect(consumer.committed(taskId00Partitions)).andReturn(singletonMap(t1p0, 
offsetAndMetadata));
+consumer.seek(t1p0, offsetAndMetadata);
+expectLastCall();
+consumer.seekToBeginning(emptySet());
+expectLastCall();
 replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
-
+taskManager.setPartitionResetter(tp -> {
+assertThat(tp, is(empty()));
+return emptySet();
+});

Review comment:
   This all amounts to checking that we really reset the consumer to the 
last committed position.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##
@@ -504,7 +504,7 @@ private void resetOffsetPosition(TopicPartition tp) {
 if (strategy == OffsetResetStrategy.EARLIEST) {
 offset = beginningOffsets.get(tp);
 if (offset == null)
-throw new IllegalStateException("MockConsumer didn't have 
beginning offset specified, but tried to seek to beginning");
+throw new IllegalStateException("MockConsumer didn't have 
beginning offset for " + tp + " specified, but tried to seek to beginning");

Review comment:
   Just a quality-of-life improvement.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
   Bugfix: we shouldn't call this task corrupted for not having a 
checkpoint of a non-persistent store.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -462,7 +468,7 @@ public void flush() {
  */
 @Override
 public void close() throws ProcessorStateException {
-log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
+log.info("Closing its state manager and all the registered state 
stores: {}", stores);

Review comment:
   ```suggestion
   log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -648,7 +650,7 @@ void runOnce() {
 
 // only try to initialize the assigned tasks

[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452365162



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
   I'm not sure how this came up now for me. I don't think it was related 
to any changes in this PR, but the SuppressionDurabilityIntegrationTest fails 
every time without it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10131) Minimize use of --zookeeper flag in ducktape tests

2020-07-09 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar reassigned KAFKA-10131:
--

Assignee: Ron Dagostino  (was: Vinoth Chandar)

> Minimize use of --zookeeper flag in ducktape tests
> --
>
> Key: KAFKA-10131
> URL: https://issues.apache.org/jira/browse/KAFKA-10131
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Vinoth Chandar
>Assignee: Ron Dagostino
>Priority: Major
>
> Get the ducktape tests working without the --zookeeper flag (except for 
> scram).
> (Note: When doing compat testing we'll still use the old flags.)
> Below are the current usages 
> {code:java}
> [tests]$ grep -R -e "--zookeeper" . 
> ./kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py:# Cannot 
> use --zookeeper because kafka-topics.sh is unable to connect to a TLS-enabled 
> ZooKeeper quorum,
> ./kafkatest/tests/client/quota_test.py:cmd = "%s --zookeeper %s 
> --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \
> ./kafkatest/services/console_consumer.py:cmd += " --zookeeper 
> %(zk_connect)s" % args
> ./kafkatest/services/security/security_config.py:cmd = "%s 
> --zookeeper %s --entity-name %s --entity-type users --alter --add-config 
> %s=[password=%s]" % \
> ./kafkatest/services/zookeeper.py:la_migra_cmd += "%s 
> --zookeeper.acl=%s --zookeeper.connect=%s %s" % \
> ./kafkatest/services/zookeeper.py:cmd = "%s kafka.admin.ConfigCommand 
> --zookeeper %s %s --describe --topic %s" % \
> # Used by MessageFormatChangeTest, TruncationTest 
> ./kafkatest/services/kafka/kafka.py:cmd += "%s --zookeeper %s %s 
> --entity-name %s --entity-type topics --alter --add-config 
> message.format.version=%s" % \
> ./kafkatest/services/kafka/kafka.py:cmd += "%s --zookeeper %s %s 
> --entity-name %s --entity-type topics --alter --add-config 
> unclean.leader.election.enable=%s" % \
> # called by reassign_partitions.sh, ThrottlingTest, ReassignPartitionsTest  
> ./kafkatest/services/kafka/kafka.py:cmd += "--zookeeper %s " % 
> self.zk_connect_setting()
> ./kafkatest/services/kafka/kafka.py:cmd += "--zookeeper %s " % 
> self.zk_connect_setting()
> ./kafkatest/services/kafka/kafka.py:connection_setting = 
> "--zookeeper %s" % (self.zk_connect_setting())
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-07-09 Thread GitBox


omkreddy commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-656246737


   I agree,  it's a useful fix. I think its OK to break existing links.  I have 
not seen anywhere using those links. Lets see what others think.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh

2020-07-09 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154741#comment-17154741
 ] 

Vinoth Chandar commented on KAFKA-10174:


[https://github.com/apache/kafka/pull/8948]

 

> Prefer --bootstrap-server ducktape tests using kafka_configs.sh
> ---
>
> Key: KAFKA-10174
> URL: https://issues.apache.org/jira/browse/KAFKA-10174
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante opened a new pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown

2020-07-09 Thread GitBox


C0urante opened a new pull request #9003:
URL: https://github.com/apache/kafka/pull/9003


   A benign `WakeupException` can be thrown by a sink task's consumer if it's 
scheduled for shutdown by the worker. This is caught and handled gracefully if 
the exception is thrown when calling `poll` on the consumer, but not if called 
`commitSync`, which is invoked by a task during shutdown and also when its 
partition assignment is updated.
   
   If thrown during a partition assignment update, the `WakeupException` is 
caught and handled gracefully as part of the task's `iteration` loop. If thrown 
during shutdown, however, it is not caught and instead leads to the 
scary-looking log message "Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted.".
   
   These changes catch the `WakeupException` during shutdown and handle it 
gracefully with a `TRACE`-level log message.
   
   A unit test is added to verify this behavior by simulating a thrown 
`WakeupException` during `Consumer::commitSync`, running through the 
`WorkerSinkTask::execute` method, and confirming that it does not throw a 
`WakeupException` itself.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown

2020-07-09 Thread GitBox


C0urante commented on pull request #9003:
URL: https://github.com/apache/kafka/pull/9003#issuecomment-656247517


   @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing 
this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10235) Fix flaky transactions_test.py

2020-07-09 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-10235:

Fix Version/s: (was: 3.0.0)
   2.7.0

> Fix flaky transactions_test.py
> --
>
> Key: KAFKA-10235
> URL: https://issues.apache.org/jira/browse/KAFKA-10235
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.7.0
>
>
> {code}
> =hard_bounce.bounce_target=clients.check_order=False.use_group_metadata=False:
>  FAIL: copier-1 : Message copier didn't make enough progress in 30s. Current 
> progress: 0
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 134, in run
> data = self.run_test()
>   File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 192, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 
> 429, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 254, in test_transactions
> num_messages_to_copy=self.num_seed_messages, 
> use_group_metadata=use_group_metadata)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 195, in copy_messages_transactionally
> self.bounce_copiers(copiers, clean_shutdown)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 120, in bounce_copiers
> % (copier.transactional_id, str(copier.progress_percent(
>   File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line 
> 41, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
> TimeoutError: copier-1 : Message copier didn't make enough progress in 30s. 
> Current progress: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10213) Prefer --bootstrap-server in ducktape tests for Kafka clients

2020-07-09 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar reassigned KAFKA-10213:
--

Assignee: Ron Dagostino  (was: Vinoth Chandar)

> Prefer --bootstrap-server in ducktape tests for Kafka clients
> -
>
> Key: KAFKA-10213
> URL: https://issues.apache.org/jira/browse/KAFKA-10213
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vinoth Chandar
>Assignee: Ron Dagostino
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #8973: KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once

2020-07-09 Thread GitBox


C0urante commented on pull request #8973:
URL: https://github.com/apache/kafka/pull/8973#issuecomment-656247489


   @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing 
this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656247859


   Looks like there was a spurious failure of the new test:
   ```
   test_id:
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.to_version=2.5.0-SNAPSHOT.from_version=2.0.1.bounce_type=full
   status: FAIL
   run time:   1 minute 43.164 seconds
   
   
   Server connection dropped: 
   Traceback (most recent call last):
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 134, in run
   data = self.run_test()
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 192, in run_test
   return self.test_context.function(self.test)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/mark/_mark.py",
 line 429, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py",
 line 108, in test_app_upgrade
   self.restart_all_nodes_with(to_version)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py",
 line 173, in restart_all_nodes_with
   self.processor2.start_node(self.processor2.node)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/streams.py",
 line 305, in start_node
   node.account.create_file(self.CONFIG_FILE, prop_file)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/cluster/remoteaccount.py",
 line 588, in create_file
   with self.sftp_client.open(path, "w") as f:
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py",
 line 372, in open
   t, msg = self._request(CMD_OPEN, filename, imode, attrblock)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py",
 line 813, in _request
   return self._read_response(num)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py",
 line 845, in _read_response
   raise SSHException("Server connection dropped: {}".format(e))
   SSHException: Server connection dropped: 
   ```
   I'll re-run it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452370517



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -104,17 +104,16 @@ static void closeStateManager(final Logger log,
 if (stateDirectory.lock(id)) {
 try {
 stateMgr.close();
-
+} catch (final ProcessorStateException e) {
+firstException.compareAndSet(null, e);
+} finally {
 if (wipeStateStore) {
 log.debug("Wiping state stores for {} task {}", 
taskType, id);
 // we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
 // and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
 // need to re-bootstrap the restoration from the 
beginning
 Utils.delete(stateMgr.baseDir());

Review comment:
   Right, it's not a correctness issue but it's additional needless 
overhead to go through the whole cycle of initializing a task, getting a 
TaskCorrupted, wiping it then, and finally restarting it. Of course if we keep 
hitting an issue during `closeDirty` then we might never wipe the state, which 
does seem like a real problem. For example if there's some issue with the 
state, like the files are actually corrupted or something





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656249347


   Ah, and the other smoke test usages are messed up:
   ```
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_rebalance_simple
   status: FAIL
   run time:   1 minute 28.121 seconds
   
   
   __init__() takes exactly 4 arguments (3 given)
   Traceback (most recent call last):
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 134, in run
   data = self.run_test()
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 192, in run_test
   return self.test_context.function(self.test)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_eos_test.py",
 line 41, in test_rebalance_simple
   self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, 
self.kafka),
   TypeError: __init__() takes exactly 4 arguments (3 given)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10258) Get rid of use_zk_connection flag in kafka.py public methods

2020-07-09 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10258:
--

 Summary: Get rid of use_zk_connection flag in kafka.py public 
methods
 Key: KAFKA-10258
 URL: https://issues.apache.org/jira/browse/KAFKA-10258
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Sorry yeah the relevant part doesn't show up on github. Basically we 
register
   ```  
   stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback);   
   stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback); 
   stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
   ```
   
   but only write the checkpoint for the `persistentStorePartition`, 
nonPersistentStorePartition` and `irrelevantPartition`. I think the point of 
the `irrelevantPartition` is to make sure that we detect that the 
`persistentStoreTwoPartition` offset is missing even though the checkpoint 
technically has the correct number of offsets in total. ie, that we actually 
map the offsets to a registered changelog





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests

2020-07-09 Thread GitBox


vinothchandar commented on pull request #8948:
URL: https://github.com/apache/kafka/pull/8948#issuecomment-656251801


   @cmccabe this PR is a sub task under KAFKA-10131, which tracks the bigger 
goal. I filed a new issue explicitly targetting removal of `use_zk_connection` 
. For this pr, the public methods affects 
`alter_message_format`/`set_unclean_leader_election` don't offer this flag.. 
   
   So this PR should be fine for this scope?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-09 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r452374925



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, 
RequestFuture f
  *   value of each partition may be null only for v0. In v1 
and later the ListOffset API would not
  *   return a null timestamp (-1 is returned instead when 
necessary).
  */
-private void handleListOffsetResponse(Map timestampsToSearch,
+private void handleListOffsetResponse(Map timestampsToSearch,
   ListOffsetResponse 
listOffsetResponse,
   RequestFuture 
future) {
 Map fetchedOffsets = new HashMap<>();
 Set partitionsToRetry = new HashSet<>();
 Set unauthorizedTopics = new HashSet<>();
 
-for (Map.Entry entry 
: timestampsToSearch.entrySet()) {
+Map partitionsData = 
byTopicPartitions(listOffsetResponse.responseData());

Review comment:
   We now added logic in the AdminClient to handle partial responses from 
brokers (based on 
https://github.com/apache/kafka/pull/8295#discussion_r449575550). Shouldn't we 
do the same here instead of assuming the response is always complete? I'm not 
even sure if we should retry if a resource is missing from the response but we 
could at least log it instead of hitting a NPE.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452381474



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
   I like my fix better :P 
   
   But seriously, no need to block this PR on mine if it's suddenly causing 
tests to fail. Mysterious..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


mjsax commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261242


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Sorry yeah the relevant part doesn't show up on github. Basically we 
register
   ```  
   stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback);   
   stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback); 
   stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
   ```
   
   but only write the checkpoint for the `persistentStorePartition`, 
nonPersistentStorePartition` and `irrelevantPartition` . I think the point of 
the `irrelevantPartition` is to make sure that we detect that the 
`persistentStoreTwoPartition` offset is missing even though the checkpoint 
technically has the correct number of offsets in total. ie, that we actually 
map the offsets to a registered changelog





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


mjsax commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261358


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Sorry yeah the relevant part doesn't show up on github. Basically we 
register
   ```  
   stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback);   
   stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback); 
   stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
   ```
   
   but only write the checkpoint for the `persistentStorePartition`, 
`nonPersistentStorePartition` and `irrelevantPartition` . I think the point of 
the `irrelevantPartition` is to make sure that we detect that the 
`persistentStoreTwoPartition` offset is missing even though the checkpoint 
technically has the correct number of offsets in total. ie, that we actually 
map the offsets to a registered changelog





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656263751


   Unrelated test failure:
   
   ```
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


ableegoldman commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656265143


   Aw...the EosBetaUpgradeIntegrationTest failed again?? :/ 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452391941



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,24 +45,23 @@
 protected ProcessorNode currentNode;
 private long currentSystemTimeMs;
 
-final StateManager stateManager;

Review comment:
   I see. Would it not be sufficient to just keep a ("duplicate") reference 
of `ProcessorStateManager` within `ProcessorContextImpl`? 
   
   Just to clarify: I am ok with the proposed changes. Just wondering if it's 
really the best structure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452393587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   We only add elements to the List and then iterate over it. A LinkedList 
seems slightly better for this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452394510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final String storeName = name();
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+ changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   Ah thanks. I missed this case.
   
   However, should we move both `null` checks into 
`ProcessorContextUtils.changelogFor()` for this case? It seem, we do the same 
"outer" `null`-check each time we call the method, so why not do it at a single 
place in the code?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452395964



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   Not at all. This is a perfect case for ArrayList. LinkedList iteration 
is very slow due to pointer chasing (in comparison).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452396471



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   FYI https://twitter.com/joshbloch/status/583813919019573248 :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452400759



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   That’s good to know. I always thought that the iteration was more or 
less equivalent. I have learnt something today. Let me update that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-6453) Reconsider timestamp propagation semantics

2020-07-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6453:
--

Assignee: James Galasyn  (was: Victoria Bialas)

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics

2020-07-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6453:
---
Summary: Document timestamp propagation semantics  (was: Reconsider 
timestamp propagation semantics)

> Document timestamp propagation semantics
> 
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics

2020-07-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6453:
---
Description: 
Atm, Kafka Streams only has a defined "contract" about timestamp propagation at 
the Processor API level: all processor within a sub-topology, see the timestamp 
from the input topic record and this timestamp will be used for all result 
record when writing them to an topic, too.

The DSL, inherits this "contract" atm.

>From a DSL point of view, it would be desirable to provide a different 
>contract to the user. To allow this, we need to do the following:

 - extend Processor API to allow manipulation timestamps (ie, a Processor can 
set a new timestamp for downstream records)
  - define a DSL "contract" for timestamp propagation for each DSL operator
  - document the DSL "contract"
  - implement the DSL "contract" using the new/extended Processor API

Changing the DSL contract etc was done via KIP-258. This ticket is about 
documenting the contract.

  was:
Atm, Kafka Streams only has a defined "contract" about timestamp propagation at 
the Processor API level: all processor within a sub-topology, see the timestamp 
from the input topic record and this timestamp will be used for all result 
record when writing them to an topic, too.

The DSL, inherits this "contract" atm.

>From a DSL point of view, it would be desirable to provide a different 
>contract to the user. To allow this, we need to do the following:

 - extend Processor API to allow manipulation timestamps (ie, a Processor can 
set a new timestamp for downstream records)
 - define a DSL "contract" for timestamp propagation for each DSL operator
 - document the DSL "contract"
 - implement the DSL "contract" using the new/extended Processor API


> Document timestamp propagation semantics
> 
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>   - define a DSL "contract" for timestamp propagation for each DSL operator
>   - document the DSL "contract"
>   - implement the DSL "contract" using the new/extended Processor API
> Changing the DSL contract etc was done via KIP-258. This ticket is about 
> documenting the contract.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452408268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.

Review comment:
   Sound like something we should fix. Can you file a ticket?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452410965



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -95,7 +96,7 @@
  *  |  | Assigned (3)| <+
  *  |  +-+---+  |
  *  ||  |
- *  ||  |
+ *  ||--+

Review comment:
   Where does this self-transition happen exactly? And could/should we 
detect this case and not call `setState()` for this case instead of allowing 
the transition?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154790#comment-17154790
 ] 

Chris Egerton commented on KAFKA-10253:
---

[~klalafaryan] can you provide the configurations for all three of your workers?

It would also be helpful if you could restart one of the workers and capture 
the logs for it from startup through the first few iterations of this rebalance 
loop.

> Kafka Connect gets into an infinite rebalance loop
> --
>
> Key: KAFKA-10253
> URL: https://issues.apache.org/jira/browse/KAFKA-10253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Konstantin Lalafaryan
>Priority: Blocker
>
> Hello everyone!
>  
> We are running kafka-connect cluster  (3 workers) and very often it gets into 
> an infinite rebalance loop.
>  
> {code:java}
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655831 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655831 with protocol version 2 
> and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655832 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655832 with protocol version 2 
> and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined gr

[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452415850



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final String storeName = name();
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+ changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   I would not put any code that is not related to casts of 
`ProcessorContext` into `ProessorContextUtils`. I think the goal of 
`ProessorContextUtils` is to contain all code of which we want to get rid of in 
the future once the casts are fixed.
   
   We could move the `null` check into the constructor of `StateSerde` since we 
do also there a `null` check. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >