[jira] [Commented] (KAFKA-9435) Replace DescribeLogDirs request/response with automated protocol

2020-01-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17016850#comment-17016850
 ] 

ASF GitHub Bot commented on KAFKA-9435:
---

tombentley commented on pull request #7972: KAFKA-9435: DescribeLogDirs 
automated protocol
URL: https://github.com/apache/kafka/pull/7972
 
 
   Also add version 2 to make use of flexible versions, per KIP-482.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace DescribeLogDirs request/response with automated protocol
> 
>
> Key: KAFKA-9435
> URL: https://issues.apache.org/jira/browse/KAFKA-9435
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9443) Producer Can Fail with NPE

2020-01-16 Thread David Mollitor (Jira)
David Mollitor created KAFKA-9443:
-

 Summary: Producer Can Fail with NPE
 Key: KAFKA-9443
 URL: https://issues.apache.org/jira/browse/KAFKA-9443
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mollitor


{code:none}
kafka.api.ClientIdQuotaTest > testProducerConsumerOverrideUnthrottled FAILED
java.lang.NullPointerException
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1141)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:429)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:312)
at 
kafka.api.IntegrationTestHarness.createProducer(IntegrationTestHarness.scala:123)
at 
kafka.api.ClientIdQuotaTest.createQuotaTestClients(ClientIdQuotaTest.scala:37)
at kafka.api.BaseQuotaTest.setUp(BaseQuotaTest.scala:76)
at kafka.api.ClientIdQuotaTest.setUp(ClientIdQuotaTest.scala:33)
{code}

I believe that the issue is that there is some small amount of initialization 
that happens before the logger is configured.  If a failure occurs quickly then 
an Exception is thrown and the logger is used to report the error,.. but it's 
not yet setup so it fails with NPE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9423) Refine layout of configuration options on website and make individual settings directly linkable

2020-01-16 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017063#comment-17017063
 ] 

Sönke Liebau commented on KAFKA-9423:
-

Thinking this a little further, is there any appetite for replacing all that 
string concatenation with some form of HTML generation lib?
I've just used [j2html|https://j2html.com/] and found it fairly lightweight and 
usable.
Since that code is not needed at runtime but only to build the site during the 
Gradle run we could probably just have it as a compile only dependency and not 
clutter the release tars with any additional dependencies.

> Refine layout of configuration options on website and make individual 
> settings directly linkable
> 
>
> Key: KAFKA-9423
> URL: https://issues.apache.org/jira/browse/KAFKA-9423
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Sönke Liebau
>Assignee: Sönke Liebau
>Priority: Trivial
> Attachments: option1.png, option2.png, option3.png, option4.png
>
>
> KAFKA-8474 changed the layout of configuration options on the website from a 
> table which over time ran out of horizontal space to a list.
> This vastly improved readability but is not yet ideal. Further discussion was 
> had in the [PR|https://github.com/apache/kafka/pull/6870] for KAFKA-8474.
> This ticket is to move that discussion to a separate thread and make it more 
> visible to other people and to give subsequent PRs a home.
> Currently proposed options are attached to this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9424) Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer

2020-01-16 Thread Steven Lu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Lu updated KAFKA-9424:
-
Reviewer: Ismael Juma  (was: Guangyuan Wang)

> Using AclCommand,avoid call the global method loadcache in SimpleAclAuthorizer
> --
>
> Key: KAFKA-9424
> URL: https://issues.apache.org/jira/browse/KAFKA-9424
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, tools
>Affects Versions: 0.10.2.0, 2.4.0, 2.3.1
> Environment: Linux,JDK7+
>Reporter: Steven Lu
>Priority: Major
>  Labels: Solved
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> In the class Named AclCommand,configure SimpleAclAuthorizer,but no need call 
> loadCache.
> now we have 20,000 topics in kafka cluster,everytime I run AclCommand,all 
> these topics's Alcs need to be authed, it will be very slow.
> The purpose of this optimization is:we can choose to not load the acl of all 
> topics into memory, mainly for adding and deleting permissions.
> PR Available here: [https://github.com/apache/kafka/pull/7706]
> mainly for adding and deleting permissions,we can choose to not load the acl 
> of all topics into memory,then we can add two args "--load-acl-cache" "false" 
> in AclCommand.main;else you don't add these args, it will load the acl cache 
> defaultly.
> we can choose improve the running time from minutes to less than one second.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool

2020-01-16 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017077#comment-17017077
 ] 

highluck commented on KAFKA-9042:
-

[~mjsax]  [~bchen225242] 

I create a KIP, I modified the code by adding TestCode. Can I ask you to review 
again?


[KIP-560: Auto infer external topic partitions in stream reset 
tool|https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool?src=jira]

> Auto infer external topic partitions in stream reset tool
> -
>
> Key: KAFKA-9042
> URL: https://issues.apache.org/jira/browse/KAFKA-9042
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Assignee: highluck
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> As of today, user has to specify `--input-topic` in the stream reset to be 
> able to reset offset to a specific position. For a stream job with multiple 
> external topics that needs to be purged, users usually don't want to name all 
> the topics in order to reset the offsets. It's really painful to look through 
> the entire topology to make sure we purge all the committed offsets.
> We could add a config `--reset-all-external-topics` to the reset tool such 
> that when enabled, we could delete offsets for all involved topics. The topic 
> metadata could be acquired by issuing a `DescribeGroup` request from admin 
> client, which is stored in the member subscription information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9042) Auto infer external topic partitions in stream reset tool

2020-01-16 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017077#comment-17017077
 ] 

highluck edited comment on KAFKA-9042 at 1/16/20 4:18 PM:
--

[~mjsax]  [~bchen225242]

I create a KIP, I modified the code by adding TestCode. Can I ask you to review 
again?

 

[GitHub Pull Request #7948|https://github.com/apache/kafka/pull/7948]

[KIP-560: Auto infer external topic partitions in stream reset 
tool|https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool?src=jira]


was (Author: high.lee):
[~mjsax]  [~bchen225242] 

I create a KIP, I modified the code by adding TestCode. Can I ask you to review 
again?


[KIP-560: Auto infer external topic partitions in stream reset 
tool|https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool?src=jira]

> Auto infer external topic partitions in stream reset tool
> -
>
> Key: KAFKA-9042
> URL: https://issues.apache.org/jira/browse/KAFKA-9042
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Assignee: highluck
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> As of today, user has to specify `--input-topic` in the stream reset to be 
> able to reset offset to a specific position. For a stream job with multiple 
> external topics that needs to be purged, users usually don't want to name all 
> the topics in order to reset the offsets. It's really painful to look through 
> the entire topology to make sure we purge all the committed offsets.
> We could add a config `--reset-all-external-topics` to the reset tool such 
> that when enabled, we could delete offsets for all involved topics. The topic 
> metadata could be acquired by issuing a `DescribeGroup` request from admin 
> client, which is stored in the member subscription information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9440) Add ConsumerGroupCommand to delete static members

2020-01-16 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017080#comment-17017080
 ] 

highluck commented on KAFKA-9440:
-

[~bchen225242]

Can I take this ticket with me?

> Add ConsumerGroupCommand to delete static members
> -
>
> Key: KAFKA-9440
> URL: https://issues.apache.org/jira/browse/KAFKA-9440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, kip, newbie, newbie++
>
> We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It 
> would be good to instantiate the API as part of the ConsumerGroupCommand for 
> easy command line usage. 
> This change requires a new KIP, and just posting out here in case anyone who 
> uses static membership to pick it up, if they would like to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-01-16 Thread Raman Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017287#comment-17017287
 ] 

Raman Gupta commented on KAFKA-8803:


Very very strange. Following up on my last message, after a client restart, my 
stream is showing a `-` for multiple partition offsets (no offsets committed), 
even though every partition is assigned a consumer. The stream does not read 
any of the messages on that partition until more messages are sent to it. At 
that point it reads those messages and updates the offset. However, I certainly 
have messages in that topic that were *never* processed by my EXACTLY_ONCE 
stream. It all seems related to this issue, and this is very scary -- now I'm 
wondering what else hasn't been processed from Kafka.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Boyang Chen
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9444) Cannot connect to zookeeper after updating the kafka config

2020-01-16 Thread Rishabh Bohra (Jira)
Rishabh Bohra created KAFKA-9444:


 Summary: Cannot connect to zookeeper after updating the kafka 
config
 Key: KAFKA-9444
 URL: https://issues.apache.org/jira/browse/KAFKA-9444
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Rishabh Bohra


h4. *Issue:*

While connecting the kafka with zookeeper at a custom path, in the broker-0, 
this error message pops up-

{{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer)}}
{{ kafka.common.InconsistentClusterIdException: The Cluster ID 
2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
the wrong cluster. Configured zookeeper.connect may be wrong.}}
{{ at kafka.server.KafkaServer.startup(KafkaServer.scala:220)}}
{{ at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)}}
{{ at kafka.Kafka$.main(Kafka.scala:84)}}
{{ at kafka.Kafka.main(Kafka.scala)}}

*{{Steps:}}*

{{1. Start the kafka cluster without any zookeeper path/url}}
{{2. Update the kafka configuration with the provided path for zookeeper.}}
{{3. Check broker logs}}

*Package Version:

*Zookeeper: 3.4.14
Zookeeper Client - Zookeeper-3.4.14.jar
Kafka - 2.12-2.4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2020-01-16 Thread Raman Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017311#comment-17017311
 ] 

Raman Gupta commented on KAFKA-8803:


Is it possible this is related in the opposite direction? i.e. the offsets 
expired due to `offsets.retention.minutes`, and then because of that this issue 
is triggered for the stream for which offsets were expired? I don't see why 
that would be the case in this situation, because the stream has been running 
continuously, and since the consumer group was never empty for more than 7 
days, I see no reason why the offsets should have been expired. However, I 
throw this out there for consideration.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Boyang Chen
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9444) Cannot connect to zookeeper after updating the kafka config

2020-01-16 Thread Rishabh Bohra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rishabh Bohra updated KAFKA-9444:
-
Description: 
h4. *Issue:*

While connecting the kafka with zookeeper at a custom path, in the broker-0, 
this error message pops up-

{{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
the wrong cluster. Configured zookeeper.connect may be wrong. at 
kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}

*Steps:*

{{1. Start the kafka cluster without any zookeeper path/url}}
 {{2. Update the kafka configuration with the provided path for zookeeper.}}
 {{3. Check broker logs}}

*Package Version:*

Zookeeper: 3.4.14
 Zookeeper Client - Zookeeper-3.4.14.jar
 Kafka - 2.12-2.4.0

  was:
h4. *Issue:*

While connecting the kafka with zookeeper at a custom path, in the broker-0, 
this error message pops up-

{{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer)}}
{{ kafka.common.InconsistentClusterIdException: The Cluster ID 
2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
the wrong cluster. Configured zookeeper.connect may be wrong.}}
{{ at kafka.server.KafkaServer.startup(KafkaServer.scala:220)}}
{{ at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)}}
{{ at kafka.Kafka$.main(Kafka.scala:84)}}
{{ at kafka.Kafka.main(Kafka.scala)}}

*{{Steps:}}*

{{1. Start the kafka cluster without any zookeeper path/url}}
{{2. Update the kafka configuration with the provided path for zookeeper.}}
{{3. Check broker logs}}

*Package Version:

*Zookeeper: 3.4.14
Zookeeper Client - Zookeeper-3.4.14.jar
Kafka - 2.12-2.4.0


> Cannot connect to zookeeper after updating the kafka config
> ---
>
> Key: KAFKA-9444
> URL: https://issues.apache.org/jira/browse/KAFKA-9444
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Rishabh Bohra
>Priority: Major
>  Labels: mesosphere
>
> h4. *Issue:*
> While connecting the kafka with zookeeper at a custom path, in the broker-0, 
> this error message pops up-
> {{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
> Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
> Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
> the wrong cluster. Configured zookeeper.connect may be wrong. at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
> kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}
> *Steps:*
> {{1. Start the kafka cluster without any zookeeper path/url}}
>  {{2. Update the kafka configuration with the provided path for zookeeper.}}
>  {{3. Check broker logs}}
> *Package Version:*
> Zookeeper: 3.4.14
>  Zookeeper Client - Zookeeper-3.4.14.jar
>  Kafka - 2.12-2.4.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2020-01-16 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017315#comment-17017315
 ] 

highluck commented on KAFKA-7658:
-

[~mjsax]

If I am not ready now, can I take my ticket with me?

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Aishwarya Pradeep Kumar
>Priority: Major
>  Labels: kip, newbie
>
> KIP-523: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL]
>  
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code:java}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference:
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9440) Add ConsumerGroupCommand to delete static members

2020-01-16 Thread highluck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

highluck updated KAFKA-9440:

Comment: was deleted

(was: [~bchen225242]

Can I take this ticket with me?)

> Add ConsumerGroupCommand to delete static members
> -
>
> Key: KAFKA-9440
> URL: https://issues.apache.org/jira/browse/KAFKA-9440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, kip, newbie, newbie++
>
> We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It 
> would be good to instantiate the API as part of the ConsumerGroupCommand for 
> easy command line usage. 
> This change requires a new KIP, and just posting out here in case anyone who 
> uses static membership to pick it up, if they would like to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9445:


 Summary: Allow fetching a key from a single partition rather than 
iterating over all the stores on an instance
 Key: KAFKA-9445
 URL: https://issues.apache.org/jira/browse/KAFKA-9445
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar
Assignee: Navinder Brar


Currently when expanding the KS cluster, the new node's partitions will be 
unavailable during the rebalance, which for large states can take a very long 
time, or for small state stores even more than a few ms can be a deal-breaker 
for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows. Adding the use case from KAFKA-8994 as it is more descriptive.

"Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Labels:   (was: kip-535)

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Description: (was: Currently when expanding the KS cluster, the new 
node's partitions will be unavailable during the rebalance, which for large 
states can take a very long time, or for small state stores even more than a 
few ms can be a deal-breaker for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows. Adding the use case from KAFKA-8994 as it is more descriptive.

"Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency.")

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Description: Whenever a call is made to get a particular key from a Kafka 
Streams instance, currently it returns a Queryable store that contains a list 
of the stores for all the running and restoring/replica(with KIP-535) on the 
instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
store one by one. With the changes that went in as a part of KIP-535 since we 
have access to the information that a key belongs to which partition, we should 
have a capability to fetch store for that particular partition and look for key 
in store for that partition only. It would be a good improvement for improving 
latencies for applications that contain multiple partitions on a single 
instance and don't have bloom filters enabled internally for Rocksdb.

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>
> Whenever a call is made to get a particular key from a Kafka Streams 
> instance, currently it returns a Queryable store that contains a list of the 
> stores for all the running and restoring/replica(with KIP-535) on the 
> instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
> then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
> store one by one. With the changes that went in as a part of KIP-535 since we 
> have access to the information that a key belongs to which partition, we 
> should have a capability to fetch store for that particular partition and 
> look for key in store for that partition only. It would be a good improvement 
> for improving latencies for applications that contain multiple partitions on 
> a single instance and don't have bloom filters enabled internally for Rocksdb.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes

2020-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017370#comment-17017370
 ] 

Matthias J. Sax commented on KAFKA-9259:


Could KAFKA-8317 be a duplicate?

> suppress() for windowed-Serdes does not work with default serdes
> 
>
> Key: KAFKA-9259
> URL: https://issues.apache.org/jira/browse/KAFKA-9259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Major
>  Labels: newbie
>
> The suppress() operator either inherits serdes from its upstream operator or 
> falls back to default serdes from the config.
> If the upstream operator is an windowed aggregation, the window-aggregation 
> operator wraps the user passed-in serde with a window-serde and pushed it 
> into suppress() – however, if default serdes are used, the window-aggregation 
> operator cannot push anything into suppress(). At runtime, it just creates a 
> default serde and wraps it according. For this case, suppress() also falls 
> back to default serdes; however, it does not wrap the serde and thus a 
> ClassCastException is thrown when the serde is used later.
> suppress() is already aware if the upstream aggregation is time/session 
> windowed or not and thus should use this information to wrap default serdes 
> accordingly.
> The current workaround for windowed-suppress is to overwrite the default 
> serde upstream to suppress(), such that suppress() inherits serdes and does 
> not fall back to default serdes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool

2020-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017385#comment-17017385
 ] 

Matthias J. Sax commented on KAFKA-9042:


Did you read how the KIP process works? The KIP must be discussed on the 
mailing list and be voted/accepted before we can merge code.

> Auto infer external topic partitions in stream reset tool
> -
>
> Key: KAFKA-9042
> URL: https://issues.apache.org/jira/browse/KAFKA-9042
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Assignee: highluck
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> As of today, user has to specify `--input-topic` in the stream reset to be 
> able to reset offset to a specific position. For a stream job with multiple 
> external topics that needs to be purged, users usually don't want to name all 
> the topics in order to reset the offsets. It's really painful to look through 
> the entire topology to make sure we purge all the committed offsets.
> We could add a config `--reset-all-external-topics` to the reset tool such 
> that when enabled, we could delete offsets for all involved topics. The topic 
> metadata could be acquired by issuing a `DescribeGroup` request from admin 
> client, which is stored in the member subscription information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2020-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017388#comment-17017388
 ] 

Matthias J. Sax commented on KAFKA-7658:


Not sure what your question is?

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Aishwarya Pradeep Kumar
>Priority: Major
>  Labels: kip, newbie
>
> KIP-523: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL]
>  
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code:java}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference:
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9446) Integration test library should provide utilities to assert connector state

2020-01-16 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-9446:


 Summary: Integration test library should provide utilities to 
assert connector state
 Key: KAFKA-9446
 URL: https://issues.apache.org/jira/browse/KAFKA-9446
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton


The integration testing library for Connect could be expanded to include 
methods for verifying the state of connectors. Some possibilities are:

 

{{public boolean connectorIsRunning(String connector);}}

{{public boolean connectorIsFailed(String connector);}}

{{public boolean connectorIsRunningWithTasks(String connector, int numTasks);}}

{{public boolean connectorIsFailedWithTasksRunning(String connector, int 
numTasks);}}

{{public boolean connectorAndTasksAreFailed(String connector, int numTasks);}}

 

These could be used in conjunction with the various 
[waitForCondition|https://github.com/apache/kafka/blob/6d87c12729ac6dc9d39949c931fad4c45c6af841/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L335-L372]
 methods to easily wait for connectors to be started, failed, etc. during tests.

 

Functionality like this is already present in some integration tests, but is 
implemented on a per-test basis instead of as part of the integration testing 
library itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9235) Transaction state not cleaned up following StopReplica request

2020-01-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017517#comment-17017517
 ] 

ASF GitHub Bot commented on KAFKA-9235:
---

hachikuji commented on pull request #7963: KAFKA-9235; Ensure transaction 
coordinator resigns after replica deletion
URL: https://github.com/apache/kafka/pull/7963
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Transaction state not cleaned up following StopReplica request
> --
>
> Key: KAFKA-9235
> URL: https://issues.apache.org/jira/browse/KAFKA-9235
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> When the broker receives a StopReplica request from the controller for one of 
> the transaction state topics, we should make sure to cleanup existing state 
> in the TransactionCoordinator for the corresponding partition. We have 
> similar logic already for the group coordinator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8832) We should limit the maximum size read by a fetch request on the kafka server.

2020-01-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8832.

Resolution: Fixed

This is resolved by 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-541%3A+Create+a+fetch.max.bytes+configuration+for+the+broker.
 Please reopen if you disagree.

> We should limit the maximum size read by a fetch request on the kafka server.
> -
>
> Key: KAFKA-8832
> URL: https://issues.apache.org/jira/browse/KAFKA-8832
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.2.1
>Reporter: ChenLin
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-08-25-15-31-56-707.png, 
> image-2019-08-25-15-42-24-379.png, image-2019-08-29-11-01-04-147.png, 
> image-2019-08-29-11-01-17-347.png, image-2019-08-29-11-02-01-477.png, 
> image-2019-08-29-11-03-37-693.png, image-2019-08-29-11-21-49-998.png, 
> image-2019-08-29-11-23-53-155.png, image-2019-08-29-11-25-52-242.png
>
>
> I found that kafka is not on the server side, limiting the amount of data 
> read per fetch request. This may cause the kafka server program to report an 
> error: OutOfMemory. Due to unreasonable client configuration, 
> fetch.message.max.bytes configuration is too large, such as 100M, because the 
> kafka server receives a lot of fetch requests at a certain moment, causing 
> the server to report an error: OutOfMemory。So I think this is a bug。
>    !image-2019-08-29-11-25-52-242.png!
> !image-2019-08-25-15-42-24-379.png!
> !image-2019-08-25-15-31-56-707.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8721) Metrics library upgrade

2020-01-16 Thread Mario Molina (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Molina updated KAFKA-8721:

Affects Version/s: (was: 2.3.0)
   2.4.0

> Metrics library upgrade
> ---
>
> Key: KAFKA-8721
> URL: https://issues.apache.org/jira/browse/KAFKA-8721
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.4.0
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Major
>
> The current metrics library which Kafka is using is pretty old (version 2.2.0 
> from Yammer and now we have 4.1.0 from Dropwizard).
> In the latest versions of the Dropwizard library (which comes from Yammer and 
> this is deprecated), there are a lot of bugfixes and new features included 
> which could be interesting for the metrics (ie: Reservoris, support JDK9, 
> etc).
> This patch includes the upgrade to this new version of the library so that we 
> could add new features in Kafka metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8721) Metrics library upgrade

2020-01-16 Thread Mario Molina (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Molina updated KAFKA-8721:

Affects Version/s: 2.5.0

> Metrics library upgrade
> ---
>
> Key: KAFKA-8721
> URL: https://issues.apache.org/jira/browse/KAFKA-8721
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.4.0, 2.5.0
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Major
>
> The current metrics library which Kafka is using is pretty old (version 2.2.0 
> from Yammer and now we have 4.1.0 from Dropwizard).
> In the latest versions of the Dropwizard library (which comes from Yammer and 
> this is deprecated), there are a lot of bugfixes and new features included 
> which could be interesting for the metrics (ie: Reservoris, support JDK9, 
> etc).
> This patch includes the upgrade to this new version of the library so that we 
> could add new features in Kafka metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9431) Expose API in KafkaStreams to fetch all local offset lags

2020-01-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017532#comment-17017532
 ] 

ASF GitHub Bot commented on KAFKA-9431:
---

vvcephei commented on pull request #7961: KAFKA-9431: Expose API in 
KafkaStreams to fetch all local offset lags
URL: https://github.com/apache/kafka/pull/7961
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expose API in KafkaStreams to fetch all local offset lags
> -
>
> Key: KAFKA-9431
> URL: https://issues.apache.org/jira/browse/KAFKA-9431
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9428) Expose standby information in KafkaStreams via queryMetadataForKey API

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9428.
-
Resolution: Fixed

> Expose standby information in KafkaStreams via queryMetadataForKey API
> --
>
> Key: KAFKA-9428
> URL: https://issues.apache.org/jira/browse/KAFKA-9428
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9429) Allow ability to control whether stale reads out of state stores are desirable

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9429.
-
Resolution: Fixed

> Allow ability to control whether stale reads out of state stores are desirable
> --
>
> Key: KAFKA-9429
> URL: https://issues.apache.org/jira/browse/KAFKA-9429
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.5.0
>
>
> From John :
>  
> I also meant to talk with you about the change to allow querying recovering 
> stores. I think you might have already talked with Matthias a little about 
> this in the scope of KIP-216, but it's probably not ok to just change the 
> default from only allowing query while running, since there are actually 
> people depending on full-consistency queries for correctness right now.
>  
> What we can do is add an overload {{KafkaStreams.store(name, 
> QueriableStoreType, QueriableStoreOptions)}}, with one option: 
> {{queryStaleState(true/false)}} (your preference on the name, I just made 
> that up right now). The default would be false, and KSQL would set it to 
> true. While false, it would not allow querying recovering stores OR standbys. 
> This basically allows a single switch to preserve existing behavior.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9430:

Description: 
Right now, we use _endOffsets_ of the source topic for the computation. Since 
the source topics can also have user event produces, this is an over estimate

 

>From John:

For "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
over-estimate (which seems better than an under-estimate), and it's also still 
an apples-to-apples comparison, since all replicas would use the same upper 
bound to compute their lags, so the "pick the freshest" replica is still going 
to pick the right one.

  was:
Right now, we use _endOffsets_ of the source topic for the computation. Since 
the source topics can also have user event produces, this is an over estimate

 

>From John:

For "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
over-estimate (which seems better than an under-estimate), and it's also still 
an apples-to-apples comparison, since all replicas would use the same upper 
bound to compute their lags, so the "pick the freshest" replica is still going 
to pick the right one. We can add a new 2.5 blocker ticket to really fix it, 
and not worry about it until after this KSQL stuff is done.

 

For active: we need to use  consumed offsets and not end of source topic


> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.5.0
>
>
> Right now, we use _endOffsets_ of the source topic for the computation. Since 
> the source topics can also have user event produces, this is an over estimate
>  
> From John:
> For "optimized" changelogs, this will be wrong, strictly speaking, but it's 
> an over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9430:

Priority: Blocker  (was: Major)

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. Since 
> the source topics can also have user event produces, this is an over estimate
>  
> From John:
> For "optimized" changelogs, this will be wrong, strictly speaking, but it's 
> an over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9430:

Affects Version/s: 2.5.0

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>
> Right now, we use _endOffsets_ of the source topic for the computation. Since 
> the source topics can also have user event produces, this is an over estimate
>  
> From John:
> For "optimized" changelogs, this will be wrong, strictly speaking, but it's 
> an over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9430:

Fix Version/s: (was: 2.5.0)

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>
> Right now, we use _endOffsets_ of the source topic for the computation. Since 
> the source topics can also have user event produces, this is an over estimate
>  
> From John:
> For "optimized" changelogs, this will be wrong, strictly speaking, but it's 
> an over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9235) Transaction state not cleaned up following StopReplica request

2020-01-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9235.

Fix Version/s: 2.4.1
   Resolution: Fixed

> Transaction state not cleaned up following StopReplica request
> --
>
> Key: KAFKA-9235
> URL: https://issues.apache.org/jira/browse/KAFKA-9235
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.1
>
>
> When the broker receives a StopReplica request from the controller for one of 
> the transaction state topics, we should make sure to cleanup existing state 
> in the TransactionCoordinator for the corresponding partition. We have 
> similar logic already for the group coordinator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9430:
---

Assignee: John Roesler  (was: Vinoth Chandar)

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Assignee: John Roesler
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. For 
> "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
> over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.
> The current implementation is technically correct, within the documented 
> behavior that the result is an "estimate", but I marked it as a blocker to be 
> sure that we revisit it after ongoing work to refactor the task management in 
> Streams is complete. If it becomes straightforward to tighten up the 
> estimate, we should go ahead and do it. Otherwise, we can downgrade the 
> priority of the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9430:

Description: 
Right now, we use _endOffsets_ of the source topic for the computation. For 
"optimized" changelogs, this will be wrong, strictly speaking, but it's an 
over-estimate (which seems better than an under-estimate), and it's also still 
an apples-to-apples comparison, since all replicas would use the same upper 
bound to compute their lags, so the "pick the freshest" replica is still going 
to pick the right one.

The current implementation is technically correct, within the documented 
behavior that the result is an "estimate", but I marked it as a blocker to be 
sure that we revisit it after ongoing work to refactor the task management in 
Streams is complete. If it becomes straightforward to tighten up the estimate, 
we should go ahead and do it. Otherwise, we can downgrade the priority of the 
ticket.

  was:
Right now, we use _endOffsets_ of the source topic for the computation. Since 
the source topics can also have user event produces, this is an over estimate

 

>From John:

For "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
over-estimate (which seems better than an under-estimate), and it's also still 
an apples-to-apples comparison, since all replicas would use the same upper 
bound to compute their lags, so the "pick the freshest" replica is still going 
to pick the right one.


> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. For 
> "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
> over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.
> The current implementation is technically correct, within the documented 
> behavior that the result is an "estimate", but I marked it as a blocker to be 
> sure that we revisit it after ongoing work to refactor the task management in 
> Streams is complete. If it becomes straightforward to tighten up the 
> estimate, we should go ahead and do it. Otherwise, we can downgrade the 
> priority of the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9430:
---

Assignee: (was: John Roesler)

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. For 
> "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
> over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.
> The current implementation is technically correct, within the documented 
> behavior that the result is an "estimate", but I marked it as a blocker to be 
> sure that we revisit it after ongoing work to refactor the task management in 
> Streams is complete. If it becomes straightforward to tighten up the 
> estimate, we should go ahead and do it. Otherwise, we can downgrade the 
> priority of the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9431) Expose API in KafkaStreams to fetch all local offset lags

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9431.
-
Resolution: Fixed

> Expose API in KafkaStreams to fetch all local offset lags
> -
>
> Key: KAFKA-9431
> URL: https://issues.apache.org/jira/browse/KAFKA-9431
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9430:

Parent: (was: KAFKA-6144)
Issue Type: Improvement  (was: Sub-task)

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. For 
> "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
> over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.
> The current implementation is technically correct, within the documented 
> behavior that the result is an "estimate", but I marked it as a blocker to be 
> sure that we revisit it after ongoing work to refactor the task management in 
> Streams is complete. If it becomes straightforward to tighten up the 
> estimate, we should go ahead and do it. Otherwise, we can downgrade the 
> priority of the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-6144:

Fix Version/s: 2.5.0

> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Fix For: 2.5.0
>
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal-breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows. Adding the use case from KAFKA-8994 as it is more 
> descriptive.
> "Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds back to 
> R which reverse forwards back the results.
>  * *t2:* Active A instance is killed and rebalance begins. IQs start failing 
> to A
>  * *t3*: Rebalance assignment happens and standby B is now promoted as active 
> instance. IQs continue to fail
>  * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
> commit position, IQs continue to fail
>  * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
> start succeeding again
>  
> Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
> take few seconds (~10 seconds based on defaults values). Depending on how 
> laggy the standby B was prior to A being killed, t4 can take few 
> seconds-minutes. 
> While this behavior favors consistency over availability at all times, the 
> long unavailability window might be undesirable for certain classes of 
> applications (e.g simple caches or dashboards). 
> This issue aims to also expose information about standby B to R, during each 
> rebalance such that the queries can be routed by an application to a standby 
> to serve stale reads, choosing availability over consistency."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2020-01-16 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-6144.
-
Resolution: Fixed

> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Fix For: 2.5.0
>
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal-breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows. Adding the use case from KAFKA-8994 as it is more 
> descriptive.
> "Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds back to 
> R which reverse forwards back the results.
>  * *t2:* Active A instance is killed and rebalance begins. IQs start failing 
> to A
>  * *t3*: Rebalance assignment happens and standby B is now promoted as active 
> instance. IQs continue to fail
>  * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
> commit position, IQs continue to fail
>  * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
> start succeeding again
>  
> Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
> take few seconds (~10 seconds based on defaults values). Depending on how 
> laggy the standby B was prior to A being killed, t4 can take few 
> seconds-minutes. 
> While this behavior favors consistency over availability at all times, the 
> long unavailability window might be undesirable for certain classes of 
> applications (e.g simple caches or dashboards). 
> This issue aims to also expose information about standby B to R, during each 
> rebalance such that the queries can be routed by an application to a standby 
> to serve stale reads, choosing availability over consistency."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar reassigned KAFKA-9430:
-

Assignee: Vinoth Chandar

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. For 
> "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
> over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.
> The current implementation is technically correct, within the documented 
> behavior that the result is an "estimate", but I marked it as a blocker to be 
> sure that we revisit it after ongoing work to refactor the task management in 
> Streams is complete. If it becomes straightforward to tighten up the 
> estimate, we should go ahead and do it. Otherwise, we can downgrade the 
> priority of the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-16 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017570#comment-17017570
 ] 

Vinoth Chandar commented on KAFKA-9430:
---

[~vvcephei] , is there a Jira for the refactor? we can track the dependency by 
linking it. 

> Tighten up lag estimates when source topic optimization is on 
> --
>
> Key: KAFKA-9430
> URL: https://issues.apache.org/jira/browse/KAFKA-9430
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Blocker
>
> Right now, we use _endOffsets_ of the source topic for the computation. For 
> "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
> over-estimate (which seems better than an under-estimate), and it's also 
> still an apples-to-apples comparison, since all replicas would use the same 
> upper bound to compute their lags, so the "pick the freshest" replica is 
> still going to pick the right one.
> The current implementation is technically correct, within the documented 
> behavior that the result is an "estimate", but I marked it as a blocker to be 
> sure that we revisit it after ongoing work to refactor the task management in 
> Streams is complete. If it becomes straightforward to tighten up the 
> estimate, we should go ahead and do it. Otherwise, we can downgrade the 
> priority of the ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool

2020-01-16 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017571#comment-17017571
 ] 

highluck commented on KAFKA-9042:
-

[~mjsax]

Thank you for your reply

 

Sorry.

It was a bit exciting to think that I could contribute to a very attractive 
project. 

So i was a little excited

 

I will be more careful

Thank you! 

 

 

> Auto infer external topic partitions in stream reset tool
> -
>
> Key: KAFKA-9042
> URL: https://issues.apache.org/jira/browse/KAFKA-9042
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Assignee: highluck
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> As of today, user has to specify `--input-topic` in the stream reset to be 
> able to reset offset to a specific position. For a stream job with multiple 
> external topics that needs to be purged, users usually don't want to name all 
> the topics in order to reset the offsets. It's really painful to look through 
> the entire topology to make sure we purge all the committed offsets.
> We could add a config `--reset-all-external-topics` to the reset tool such 
> that when enabled, we could delete offsets for all involved topics. The topic 
> metadata could be acquired by issuing a `DescribeGroup` request from admin 
> client, which is stored in the member subscription information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9042) Auto infer external topic partitions in stream reset tool

2020-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017575#comment-17017575
 ] 

Matthias J. Sax commented on KAFKA-9042:


All good :) Nothing to worry about. We appreciate your efforts to contribute!

> Auto infer external topic partitions in stream reset tool
> -
>
> Key: KAFKA-9042
> URL: https://issues.apache.org/jira/browse/KAFKA-9042
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Assignee: highluck
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> As of today, user has to specify `--input-topic` in the stream reset to be 
> able to reset offset to a specific position. For a stream job with multiple 
> external topics that needs to be purged, users usually don't want to name all 
> the topics in order to reset the offsets. It's really painful to look through 
> the entire topology to make sure we purge all the committed offsets.
> We could add a config `--reset-all-external-topics` to the reset tool such 
> that when enabled, we could delete offsets for all involved topics. The topic 
> metadata could be acquired by issuing a `DescribeGroup` request from admin 
> client, which is stored in the member subscription information.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes

2020-01-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017671#comment-17017671
 ] 

John Roesler commented on KAFKA-9259:
-

Ah, yes, it’s the same issue. Bummer we didn’t notice before having a whole 
separate conversation here. I guess we can close 8317, even though it’s older, 
because there is progress on this ticket. I can’t right now from my phone, 
though. 

> suppress() for windowed-Serdes does not work with default serdes
> 
>
> Key: KAFKA-9259
> URL: https://issues.apache.org/jira/browse/KAFKA-9259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Major
>  Labels: newbie
>
> The suppress() operator either inherits serdes from its upstream operator or 
> falls back to default serdes from the config.
> If the upstream operator is an windowed aggregation, the window-aggregation 
> operator wraps the user passed-in serde with a window-serde and pushed it 
> into suppress() – however, if default serdes are used, the window-aggregation 
> operator cannot push anything into suppress(). At runtime, it just creates a 
> default serde and wraps it according. For this case, suppress() also falls 
> back to default serdes; however, it does not wrap the serde and thus a 
> ClassCastException is thrown when the serde is used later.
> suppress() is already aware if the upstream aggregation is time/session 
> windowed or not and thus should use this information to wrap default serdes 
> accordingly.
> The current workaround for windowed-suppress is to overwrite the default 
> serde upstream to suppress(), such that suppress() inherits serdes and does 
> not fall back to default serdes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9441) Refactor commit logic

2020-01-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017673#comment-17017673
 ] 

ASF GitHub Bot commented on KAFKA-9441:
---

mjsax commented on pull request #7977: KAFKA-9441: Refactor Kafka Streams 
commit logic
URL: https://github.com/apache/kafka/pull/7977
 
 
   - part of KIP-447
   - create to producer per thread if eos-beta enabled
   - commit all tasks at once if eos-beta enabled
   
   Call for review @abbccdda @guozhangwang 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor commit logic
> -
>
> Key: KAFKA-9441
> URL: https://issues.apache.org/jira/browse/KAFKA-9441
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> Using producer per thread in combination with EOS, it's not possible any 
> longer to commit individual task independently (as done currently).
> We need to refactor StreamsThread, to commit all tasks at the same time for 
> the new model.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9444) Cannot connect to zookeeper after updating the kafka config

2020-01-16 Thread Rishabh Bohra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rishabh Bohra updated KAFKA-9444:
-
Description: 
h4. *Issue:*

While connecting the kafka with zookeeper at a custom path, in the broker-0, 
this error message pops up-

{{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
the wrong cluster. Configured zookeeper.connect may be wrong. at 
kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}

*Steps:*

{{1. Start the kafka cluster without any zookeeper path/url}}
 {{2. Update the kafka configuration with the provided path for zookeeper.}}
 {{3. Check broker logs}}

*Package Version:*

Zookeeper: 3.4.14
 Zookeeper Client - Zookeeper-3.4.14.jar
 Kafka - 2.12-2.4.0

Similar issue can be found 
[here|https://stackoverflow.com/questions/59592518/kafka-broker-doesnt-find-cluster-id-and-creates-new-one-after-docker-restart]

  was:
h4. *Issue:*

While connecting the kafka with zookeeper at a custom path, in the broker-0, 
this error message pops up-

{{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
(kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
the wrong cluster. Configured zookeeper.connect may be wrong. at 
kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}

*Steps:*

{{1. Start the kafka cluster without any zookeeper path/url}}
 {{2. Update the kafka configuration with the provided path for zookeeper.}}
 {{3. Check broker logs}}

*Package Version:*

Zookeeper: 3.4.14
 Zookeeper Client - Zookeeper-3.4.14.jar
 Kafka - 2.12-2.4.0


> Cannot connect to zookeeper after updating the kafka config
> ---
>
> Key: KAFKA-9444
> URL: https://issues.apache.org/jira/browse/KAFKA-9444
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Rishabh Bohra
>Priority: Major
>  Labels: mesosphere
>
> h4. *Issue:*
> While connecting the kafka with zookeeper at a custom path, in the broker-0, 
> this error message pops up-
> {{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
> Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
> Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
> the wrong cluster. Configured zookeeper.connect may be wrong. at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
> kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}
> *Steps:*
> {{1. Start the kafka cluster without any zookeeper path/url}}
>  {{2. Update the kafka configuration with the provided path for zookeeper.}}
>  {{3. Check broker logs}}
> *Package Version:*
> Zookeeper: 3.4.14
>  Zookeeper Client - Zookeeper-3.4.14.jar
>  Kafka - 2.12-2.4.0
> Similar issue can be found 
> [here|https://stackoverflow.com/questions/59592518/kafka-broker-doesnt-find-cluster-id-and-creates-new-one-after-docker-restart]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9444) Cannot connect to zookeeper after updating the kafka config

2020-01-16 Thread Rishabh Bohra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rishabh Bohra updated KAFKA-9444:
-
External issue URL: 
https://stackoverflow.com/questions/59592518/kafka-broker-doesnt-find-cluster-id-and-creates-new-one-after-docker-restart

> Cannot connect to zookeeper after updating the kafka config
> ---
>
> Key: KAFKA-9444
> URL: https://issues.apache.org/jira/browse/KAFKA-9444
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Rishabh Bohra
>Priority: Major
>  Labels: mesosphere
>
> h4. *Issue:*
> While connecting the kafka with zookeeper at a custom path, in the broker-0, 
> this error message pops up-
> {{ERROR Fatal error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The 
> Cluster ID 2yEEELdtRfKOJQiEurdoFg doesn't match stored clusterId 
> Some(H8dPCWwzRCK4eDmH3l5vvA) in meta.properties. The broker is trying to join 
> the wrong cluster. Configured zookeeper.connect may be wrong. at 
> kafka.server.KafkaServer.startup(KafkaServer.scala:220) at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at 
> kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)}}
> *Steps:*
> {{1. Start the kafka cluster without any zookeeper path/url}}
>  {{2. Update the kafka configuration with the provided path for zookeeper.}}
>  {{3. Check broker logs}}
> *Package Version:*
> Zookeeper: 3.4.14
>  Zookeeper Client - Zookeeper-3.4.14.jar
>  Kafka - 2.12-2.4.0
> Similar issue can be found 
> [here|https://stackoverflow.com/questions/59592518/kafka-broker-doesnt-find-cluster-id-and-creates-new-one-after-docker-restart]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9447) Add examples for EOS standalone and group mode under kafka/examples

2020-01-16 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9447:
--

 Summary: Add examples for EOS standalone and group mode under 
kafka/examples
 Key: KAFKA-9447
 URL: https://issues.apache.org/jira/browse/KAFKA-9447
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


Although we have integration tests for EOS model, it would be best to also put 
them in the examples for people to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9448) Meta data for topic

2020-01-16 Thread Pradeep Bansal (Jira)
Pradeep Bansal created KAFKA-9448:
-

 Summary: Meta data for topic 
 Key: KAFKA-9448
 URL: https://issues.apache.org/jira/browse/KAFKA-9448
 Project: Kafka
  Issue Type: Improvement
Reporter: Pradeep Bansal


I am looking for any way to get details on topic in terms of which user created 
a topic in my Kafka setup and at what time topic was created and any other 
associated meta data related to topic. I am using an authenticated and 
authorized Kafka setup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Labels: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
  (was: )

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>  Labels: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
>
> Whenever a call is made to get a particular key from a Kafka Streams 
> instance, currently it returns a Queryable store that contains a list of the 
> stores for all the running and restoring/replica(with KIP-535) on the 
> instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
> then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
> store one by one. With the changes that went in as a part of KIP-535 since we 
> have access to the information that a key belongs to which partition, we 
> should have a capability to fetch store for that particular partition and 
> look for key in store for that partition only. It would be a good improvement 
> for improving latencies for applications that contain multiple partitions on 
> a single instance and don't have bloom filters enabled internally for Rocksdb.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Labels: KIP-562  (was: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance)

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>  Labels: KIP-562
>
> Whenever a call is made to get a particular key from a Kafka Streams 
> instance, currently it returns a Queryable store that contains a list of the 
> stores for all the running and restoring/replica(with KIP-535) on the 
> instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
> then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
> store one by one. With the changes that went in as a part of KIP-535 since we 
> have access to the information that a key belongs to which partition, we 
> should have a capability to fetch store for that particular partition and 
> look for key in store for that partition only. It would be a good improvement 
> for improving latencies for applications that contain multiple partitions on 
> a single instance and don't have bloom filters enabled internally for Rocksdb.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9356) Potential data loss in InMemoryWindowStore and InMemorySessionStore

2020-01-16 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017748#comment-17017748
 ] 

Sophie Blee-Goldman commented on KAFKA-9356:


As [~bchen225242] said, Streams is single writer so this should not cause any 
problems. I'm going to close this bug as "Not a Problem" 

It's worth noting that it might be a good idea to switch to TreeMap for 
different reasons. Right now the ConcurrentSkipListMap allows us to safely 
perform range queries without copying over the entire keyset, but the 
performance on point queries seems to scale noticeably worse with the number of 
unique keys. Point queries are used by aggregations while range queries are 
used by windowed joins, but of course both are available within the PAPI and 
for interactive queries so it's hard to say which we should prefer. Maybe 
rather than make that tradeoff we should have one version for efficient range 
queries (a "JoinWindowStore") and one for efficient point queries 
("AggWindowStore") – or something. I know we've had similar thoughts for a 
different RocksDB store layout for Joins (although I can't find that ticket 
anywhere..), it seems like the in-memory stores could benefit from a special 
"Join" version as well cc/ [~guozhang]

> Potential data loss in InMemoryWindowStore and InMemorySessionStore
> ---
>
> Key: KAFKA-9356
> URL: https://issues.apache.org/jira/browse/KAFKA-9356
> Project: Kafka
>  Issue Type: Bug
>Reporter: Roman Leventov
>Priority: Major
>
> {{InMemoryWindowStore.put()}} and {{InMemorySessionStore.put()}} call 
> {{computeIfAbsent()}} method on {{ConcurrentSkipListMap}} objects which opens 
> up possibility for data loss because 
> {{ConcurrentSkipListMap.computeIfAbsent()}} is not an atomic operation.
> Possible fix: replace {{ConcurrentSkipListMaps with synchronized[Sorted, 
> Navigable]Map(new TreeMap<>())}}.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9356) Potential data loss in InMemoryWindowStore and InMemorySessionStore

2020-01-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-9356.

Resolution: Not A Problem

> Potential data loss in InMemoryWindowStore and InMemorySessionStore
> ---
>
> Key: KAFKA-9356
> URL: https://issues.apache.org/jira/browse/KAFKA-9356
> Project: Kafka
>  Issue Type: Bug
>Reporter: Roman Leventov
>Priority: Major
>
> {{InMemoryWindowStore.put()}} and {{InMemorySessionStore.put()}} call 
> {{computeIfAbsent()}} method on {{ConcurrentSkipListMap}} objects which opens 
> up possibility for data loss because 
> {{ConcurrentSkipListMap.computeIfAbsent()}} is not an atomic operation.
> Possible fix: replace {{ConcurrentSkipListMaps with synchronized[Sorted, 
> Navigable]Map(new TreeMap<>())}}.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-16 Thread leibo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

leibo updated KAFKA-8532:
-
Description: 
We have observed a serious deadlock between controller-event-thead and 
zk-session-expirey-handle thread. When this issue occurred, it's only one way 
to recovery the kafka cluster is restart kafka server. The  follows is the 
jstack log of controller-event-thead and zk-session-expiry-handle thread.

"zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0005ee3f7000> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
等待controller-event-thread线程处理expireEvent
 at 
kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
 at 
kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
 at 
kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
 at 
kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
 at 
kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
 at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
Source)
 at scala.collection.Iterator.foreach(Iterator.scala:937)
 at scala.collection.Iterator.foreach$(Iterator.scala:937)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
 at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
 at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
 at 
kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
 at kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
Source)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
 at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
Source)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 nid=0x310 
waiting on condition [0x7fccb55c8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0005d1be5a00> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
 at 
kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1589)
 at 
kafka.zk.KafkaZkClient.deletePreferredReplicaElection(KafkaZkClient.scala:989)
 at 
kafka.controller.KafkaController.removePartitionsFromPreferredReplicaElection(KafkaController.scala:873)
 at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:631)
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:266)
 at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1221)
 at 
kafka.controller.KafkaController$Reelect$.pr