[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-26 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101249#comment-16101249
 ] 

huxihx commented on KAFKA-5641:
---

[~ijuma]
"The broker only processes one request at a time, so even if you send the 
metadata request, it won't be processed until the processing of the previous 
request is completed"
 Seems this does not apply for METADATA request since sender thread 
only drains ProducerBatch and ensures the ordering by muting all batches from 
the same partitions before sending them out. 

In my opinion, NetworkClient could always process Metatadata request. The idea 
behind this jira is to say we could not honor 
`max.in.flight.requests.per.connection` when updating the metadata. That's to 
say, create a new method `canSendMoreNonProduceRequest` which does not care 
queue size. 

Do I make myself clear?

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5501:

Summary: introduce async ZookeeperClient  (was: use async zookeeper apis 
everywhere)

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere

2017-07-26 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5642:
---

 Summary: use async ZookeeperClient everywhere
 Key: KAFKA-5642
 URL: https://issues.apache.org/jira/browse/KAFKA-5642
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. These synchronous writes are happening at a per-partition 
granularity in several places, so partition-heavy clusters suffer from the 
controller doing many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.

KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5501:

Description: Synchronous zookeeper apis means that we wait an entire round 
trip before doing the next operation. We should introduce a zookeeper client 
that encourages pipelined requests to zookeeper.  (was: Synchronous zookeeper 
writes means that we wait an entire round trip before doing the next write. 
These synchronous writes are happening at a per-partition granularity in 
several places, so partition-heavy clusters suffer from the controller doing 
many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.)

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5501.
-
Resolution: Fixed

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101268#comment-16101268
 ] 

Onur Karaman commented on KAFKA-5501:
-

[~ijuma] I went ahead and reworded this ticket to be about making the client 
and KAFKA-5642 to be about using the client. With that, I went ahead and closed 
this ticket.

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101273#comment-16101273
 ] 

Ismael Juma commented on KAFKA-5641:


My comment was about the broker. And yes, the broker applies head of line 
blocking so that it processes one request at a time. If you send multiple 
requests, all but one are kept in the socket buffer. There is logic for this in 
the Selector.

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101273#comment-16101273
 ] 

Ismael Juma edited comment on KAFKA-5641 at 7/26/17 7:27 AM:
-

My comment was about the broker. And yes, the broker applies head of line 
blocking so that it processes one request at a time for a given connection. If 
you send multiple requests, all but one are kept in the socket buffer. There is 
logic for this in the Selector.


was (Author: ijuma):
My comment was about the broker. And yes, the broker applies head of line 
blocking so that it processes one request at a time. If you send multiple 
requests, all but one are kept in the socket buffer. There is logic for this in 
the Selector.

> Metadata request should always be allowed to send no regardless of value for 
> max.in.flight.requests.per.connection
> --
>
> Key: KAFKA-5641
> URL: https://issues.apache.org/jira/browse/KAFKA-5641
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, producer 
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>
> Metadata request might not be able to be sent when 
> `max.in.flight.requests.per.connection` is set to 1 and there is already an 
> inflight request in the same node's queue, as show below:
> {code:title=NetworkClient.java|borderStyle=solid}
> private long maybeUpdate(long now, Node node) {
> String nodeConnectionId = node.idString();
> if (canSendRequest(nodeConnectionId)) {
> ..
> }
> {code}
> However, setting `max.in.flight.requests.per.connection` to 1 actually means 
> no out-of-order for the produced records, Metadata requests should have no 
> related with this config. We don't have to check the inflight request's queue 
> size when sending Metadata request.
> [~ijuma] Does it make any sense? If yes, I will work on it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5562) Do streams state directory cleanup on a single thread

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101401#comment-16101401
 ] 

ASF GitHub Bot commented on KAFKA-5562:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3516


> Do streams state directory cleanup on a single thread
> -
>
> Key: KAFKA-5562
> URL: https://issues.apache.org/jira/browse/KAFKA-5562
> Project: Kafka
>  Issue Type: Bug
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> Currently in streams we clean up old state directories every so often (as 
> defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the 
> cleanup, which is both unnecessary and can potentially lead to race 
> conditions.
> It would be better to perform the state cleanup on a single thread and only 
> when the {{KafkaStreams}} instance is in a running state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5643) Using _DUCKTAPE_OPTIONS has no effect on executing tests

2017-07-26 Thread Paolo Patierno (JIRA)
Paolo Patierno created KAFKA-5643:
-

 Summary: Using _DUCKTAPE_OPTIONS has no effect on executing tests
 Key: KAFKA-5643
 URL: https://issues.apache.org/jira/browse/KAFKA-5643
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Reporter: Paolo Patierno
Assignee: Paolo Patierno


Hi,
as described in the documentation, you should be able to enable debugging using 
the following line :

_DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee debug_logs.txt

Instead the _DUCKTAPE_OPTIONS isn't available in the run_tests.sh script so 
it's not passed to the ducker-ak and finally on the ducktape command line.

Thanks,
Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5643) Using _DUCKTAPE_OPTIONS has no effect on executing tests

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101465#comment-16101465
 ] 

ASF GitHub Bot commented on KAFKA-5643:
---

GitHub user ppatierno opened a pull request:

https://github.com/apache/kafka/pull/3578

KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests

Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppatierno/kafka kafka-5643

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3578.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3578


commit 02e958a1ee7cdd5b7e81fcec45fba7326a4ac9fa
Author: Paolo Patierno 
Date:   2017-07-26T09:52:52Z

Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging)




> Using _DUCKTAPE_OPTIONS has no effect on executing tests
> 
>
> Key: KAFKA-5643
> URL: https://issues.apache.org/jira/browse/KAFKA-5643
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>
> Hi,
> as described in the documentation, you should be able to enable debugging 
> using the following line :
> _DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee 
> debug_logs.txt
> Instead the _DUCKTAPE_OPTIONS isn't available in the run_tests.sh script so 
> it's not passed to the ducker-ak and finally on the ducktape command line.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5644) Transient test failure: ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime

2017-07-26 Thread Manikumar (JIRA)
Manikumar created KAFKA-5644:


 Summary: Transient test failure: 
ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime
 Key: KAFKA-5644
 URL: https://issues.apache.org/jira/browse/KAFKA-5644
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Manikumar
Priority: Minor


{quote}
unit.kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToZonedDateTime 
FAILED
java.lang.AssertionError: Expected the consumer group to reset to when 
offset was 50.
at kafka.utils.TestUtils$.fail(TestUtils.scala:339)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:853)
at 
unit.kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime(ResetConsumerGroupOffsetTest.scala:188)
{quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5592) Connection with plain client to SSL-secured broker causes OOM

2017-07-26 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101565#comment-16101565
 ] 

Marcin Łuczyński commented on KAFKA-5592:
-

It's very likely.

> Connection with plain client to SSL-secured broker causes OOM
> -
>
> Key: KAFKA-5592
> URL: https://issues.apache.org/jira/browse/KAFKA-5592
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.11.0.0
> Environment: Linux x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Marcin Łuczyński
> Attachments: heapdump.20170713.100129.14207.0002.phd, Heap.PNG, 
> javacore.20170713.100129.14207.0003.txt, Snap.20170713.100129.14207.0004.trc, 
> Stack.PNG
>
>
> While testing connection with client app that does not have configured 
> truststore with a Kafka broker secured by SSL, my JVM crashes with 
> OutOfMemoryError. I saw it mixed with StackOverfowError. I attach dump files.
> The stack trace to start with is here:
> {quote}at java/nio/HeapByteBuffer. (HeapByteBuffer.java:57) 
> at java/nio/ByteBuffer.allocate(ByteBuffer.java:331) 
> at 
> org/apache/kafka/common/network/NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>  
> at 
> org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:71)
>  
> at 
> org/apache/kafka/common/network/KafkaChannel.receive(KafkaChannel.java:169) 
> at org/apache/kafka/common/network/KafkaChannel.read(KafkaChannel.java:150) 
> at 
> org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:355) 
> at org/apache/kafka/common/network/Selector.poll(Selector.java:303) 
> at org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:349) 
> at 
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>  
> at 
> org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188)
>  
> at 
> org/apache/kafka/clients/consumer/internals/AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:207)
>  
> at 
> org/apache/kafka/clients/consumer/internals/AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
>  
> at 
> org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
>  
> at 
> org/apache/kafka/clients/consumer/KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  
> at 
> org/apache/kafka/clients/consumer/KafkaConsumer.poll(KafkaConsumer.java:995) 
> at 
> com/ibm/is/cc/kafka/runtime/KafkaConsumerProcessor.process(KafkaConsumerProcessor.java:237)
>  
> at 
> com/ibm/is/cc/kafka/runtime/KafkaProcessor.process(KafkaProcessor.java:173) 
> at 
> com/ibm/is/cc/javastage/connector/CC_JavaAdapter.run(CC_JavaAdapter.java:443){quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101652#comment-16101652
 ] 

Matthias J. Sax commented on KAFKA-5386:


Understood. Technically, it would be possible to allow users to create 
changelog topic manually. But there are some strings attached. But we got the 
issues with naming conventions multiple times already. Maybe we need to do 
something about it. If you want to work on this, we would be more than happy. 
However, this change would require a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 
Let us know if you need any help preparing a KIP in case you want to pick it 
up. We can also discuss a little more on this JIRA. \cc [~miguno] [~guozhang] 
[~damianguy] [~enothereska] [~bbejeck]

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101659#comment-16101659
 ] 

Matthias J. Sax commented on KAFKA-4327:


It's Streams tool, so it belongs to package `o.a.k.streams.tools` -- we only 
put it into core because of the ZK dependency and we did not want to add ZK 
dependency to streams module. \cc [~ijuma] [~guozhang] [~ewencp]

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-3210.

Resolution: Won't Fix

We are following a slightly different approach, see KAFKA-5501.

> Using asynchronous calls through the raw ZK API in ZkUtils
> --
>
> Key: KAFKA-3210
> URL: https://issues.apache.org/jira/browse/KAFKA-3210
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>
> We have observed a number of issues with the controller interaction with 
> ZooKeeper mainly because ZkClient creates new sessions transparently under 
> the hood. Creating sessions transparently enables, for example, old 
> controller to successfully update znodes in ZooKeeper even when they aren't 
> the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass 
> the ZkClient lib like we did with ZKWatchedEphemeral.
> In addition to fixing such races with the controller, it would improve 
> performance significantly if we used the async API (see KAFKA-3038). The 
> async API is more efficient because it pipelines the requests to ZooKeeper, 
> and the number of requests upon controller recovery can be large.
> This jira proposes to make these two changes to the calls in ZkUtils and to 
> do it, one path is to first replace the calls in ZkUtils with raw async ZK 
> calls and block so that we don't have to change the controller code in this 
> phase. Once this step is accomplished and it is stable, we make changes to 
> the controller to handle the asynchronous calls to ZooKeeper.
> Note that in the first step, we will need to introduce some new logic for 
> session management, which is currently handled entirely by ZkClient. We will 
> also need to implement the subscription mechanism for event notifications 
> (see ZooKeeperLeaderElector as a an exemple).  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5328) consider switching json parser from scala to jackson

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-5328.

Resolution: Duplicate

Duplicate of KAFKA-1595.

> consider switching json parser from scala to jackson
> 
>
> Key: KAFKA-5328
> URL: https://issues.apache.org/jira/browse/KAFKA-5328
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The scala json parser is significantly slower than jackson.
> This can have a nontrivial impact on controller initialization since the 
> controller loads and parses almost all zookeeper state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5642) Use async ZookeeperClient in Controller

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5642:
---
Summary: Use async ZookeeperClient in Controller  (was: use async 
ZookeeperClient everywhere)

> Use async ZookeeperClient in Controller
> ---
>
> Key: KAFKA-5642
> URL: https://issues.apache.org/jira/browse/KAFKA-5642
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.
> KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
> requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5645) Use async ZookeeperClient in SimpleAclAuthorizer

2017-07-26 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-5645:
--

 Summary: Use async ZookeeperClient in SimpleAclAuthorizer
 Key: KAFKA-5645
 URL: https://issues.apache.org/jira/browse/KAFKA-5645
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5646) Use async ZookeeperClient for Config and ISR management

2017-07-26 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-5646:
--

 Summary: Use async ZookeeperClient for Config and ISR management
 Key: KAFKA-5646
 URL: https://issues.apache.org/jira/browse/KAFKA-5646
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5647) Use async ZookeeperClient for Admin operations

2017-07-26 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-5647:
--

 Summary: Use async ZookeeperClient for Admin operations
 Key: KAFKA-5647
 URL: https://issues.apache.org/jira/browse/KAFKA-5647
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5642) Use async ZookeeperClient in Controller

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101713#comment-16101713
 ] 

Ismael Juma commented on KAFKA-5642:


[~onurkaraman], I took a stab at splitting this JIRA into multiple ones. I 
think it will be easier to review if we do it this way instead of a single PR. 
If you have other ideas on how we should do it, happy to update things.

> Use async ZookeeperClient in Controller
> ---
>
> Key: KAFKA-5642
> URL: https://issues.apache.org/jira/browse/KAFKA-5642
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Synchronous zookeeper writes means that we wait an entire round trip before 
> doing the next write. These synchronous writes are happening at a 
> per-partition granularity in several places, so partition-heavy clusters 
> suffer from the controller doing many sequential round trips to zookeeper.
> * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
> zookeeper on transition to OnlinePartition. This gets triggered per-partition 
> sequentially with synchronous writes during controlled shutdown of the 
> shutting down broker's replicas for which it is the leader.
> * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
> OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
> triggered per-partition sequentially with synchronous writes for failed or 
> controlled shutdown brokers.
> KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
> requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3038) Speeding up partition reassignment after broker failure

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101716#comment-16101716
 ] 

ASF GitHub Bot commented on KAFKA-3038:
---

Github user resetius closed the pull request at:

https://github.com/apache/kafka/pull/2213


> Speeding up partition reassignment after broker failure
> ---
>
> Key: KAFKA-3038
> URL: https://issues.apache.org/jira/browse/KAFKA-3038
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 0.9.0.0
>Reporter: Eno Thereska
>
> After a broker failure the controller does several writes to Zookeeper for 
> each partition on the failed broker. Writes are done one at a time, in closed 
> loop, which is slow especially under high latency networks. Zookeeper has 
> support for batching operations (the "multi" API). It is expected that 
> substituting serial writes with batched ones should reduce failure handling 
> time by an order of magnitude.
> This is identified as an issue in 
> https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
>  (section End-to-end latency during a broker failure)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101715#comment-16101715
 ] 

ASF GitHub Bot commented on KAFKA-1595:
---

Github user resetius closed the pull request at:

https://github.com/apache/kafka/pull/2214


> Remove deprecated and slower scala JSON parser
> --
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Jagbir
>Assignee: Ismael Juma
>  Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0xd3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 
> 0xd3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> Â 
> -8<-
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$ano

[jira] [Created] (KAFKA-5648) make Merger extend Aggregator

2017-07-26 Thread Clemens Valiente (JIRA)
Clemens Valiente created KAFKA-5648:
---

 Summary: make Merger extend Aggregator
 Key: KAFKA-5648
 URL: https://issues.apache.org/jira/browse/KAFKA-5648
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Clemens Valiente
Assignee: Clemens Valiente
Priority: Minor


Hi,

I suggest that Merger should extend Aggregator.
reason:
Both classes usually do very similar things. A merger takes two sessions and 
combines them, an aggregator takes an existing session and aggregates new 
values into it.
in some use cases it is actually the same thing, e.g.:
 -> .map() to > -> 
.groupByKey().aggregate() to >
In this case both merger and aggregator do the same thing: take two lists and 
combine them into one.
With the proposed change we could pass the Merger as both the merger and 
aggregator to the .aggregate() method and keep our business logic within one 
merger class.

Or in other words: The Merger is simply an Aggregator that happens to aggregate 
two objects of the same class




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5649) Producer is being closed generating ssl exception

2017-07-26 Thread Pablo Panero (JIRA)
Pablo Panero created KAFKA-5649:
---

 Summary: Producer is being closed generating ssl exception
 Key: KAFKA-5649
 URL: https://issues.apache.org/jira/browse/KAFKA-5649
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.10.2.1
 Environment: Spark 2.2.0 and kafka 0.10.2.0
Reporter: Pablo Panero
Priority: Minor


On a streaming job using built-in kafka source and sink (over SSL), with I am 
getting the following exception:

On a streaming job using built-in kafka source and sink (over SSL), with  I am 
getting the following exception:

Config of the source:

{code:java}
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.bootstrapServers)
  .option("failOnDataLoss", value = false)
  .option("kafka.connections.max.idle.ms", 360)
  //SSL: this only applies to communication between Spark and Kafka 
brokers; you are still responsible for separately securing Spark inter-node 
communication.
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "GSSAPI")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
  .option("kafka.ssl.truststore.password", "changeit")
  .option("subscribe", config.topicConfigList.keys.mkString(","))
  .load()
{code}

Config of the sink:


{code:java}
.writeStream
.option("checkpointLocation", 
s"${config.checkpointDir}/${topicConfig._1}/")
.format("kafka")
.option("kafka.bootstrap.servers", config.bootstrapServers)
.option("kafka.connections.max.idle.ms", 360)
//SSL: this only applies to communication between Spark and Kafka 
brokers; you are still responsible for separately securing Spark inter-node 
communication.
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.start()
{code}


{code:java}
17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.

[jira] [Created] (KAFKA-5650) Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5650:
-

 Summary: Provide a simple way for custom storage engines to use 
streams wrapped stores (KIP-182)
 Key: KAFKA-5650
 URL: https://issues.apache.org/jira/browse/KAFKA-5650
 Project: Kafka
  Issue Type: Bug
Reporter: Damian Guy
Assignee: Damian Guy


As per KIP-182:
A new interface will be added:
{code}
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder {
 
StateStoreBuilder withCachingEnabled();
StateStoreBuilder withCachingDisabled();
StateStoreBuilder withLoggingEnabled(Map config);
StateStoreBuilder withLoggingDisabled();
T build();
}
{code}

This interface will be used to wrap stores with caching, logging etc.
Additionally some convenience methods on the {{Stores}} class:

{code}
public static  StateStoreSupplier> 
persistentKeyValueStore(final String name,

 final Serde keySerde,

 final Serde valueSerde)
 
public static  StateStoreSupplier> 
inMemoryKeyValueStore(final String name,

final Serde keySerde,

final Serde valueSerde)
 
public static  StateStoreSupplier> lruMap(final 
String name,
final int 
capacity,
final Serde 
keySerde,
final Serde 
valueSerde)
 
public static  StateStoreSupplier> 
persistentWindowStore(final String name,

final Windows windows,

final Serde keySerde,

final Serde valueSerde)
 
public static  StateStoreSupplier> 
persistentSessionStore(final String name,
  
final SessionWindows windows,
  
final Serde keySerde,
  
final Serde valueSerde)
 
/**
 *  The following methods are for use with the PAPI. They allow building of 
StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the 
KafkaStreams library
 */ 
public  StateStoreBuilder> windowStoreBuilder(final 
StateStoreSupplier> supplier)
 
public  StateStoreBuilder> keyValueStoreBuilder(final 
StateStoreSupplier> supplier)
 
public  StateStoreBuilder> sessionStoreBuilder(final 
StateStoreSupplier> supplier)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5649) Producer is being closed generating ssl exception

2017-07-26 Thread Pablo Panero (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pablo Panero updated KAFKA-5649:

Description: 
On a streaming job using built-in kafka source and sink (over SSL), with I am 
getting the following exception:

On a streaming job using built-in kafka source and sink (over SSL), with  I am 
getting the following exception:

Config of the source:

{code:java}
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.bootstrapServers)
  .option("failOnDataLoss", value = false)
  .option("kafka.connections.max.idle.ms", 360)
  //SSL: this only applies to communication between Spark and Kafka 
brokers; you are still responsible for separately securing Spark inter-node 
communication.
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "GSSAPI")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
  .option("kafka.ssl.truststore.password", "changeit")
  .option("subscribe", config.topicConfigList.keys.mkString(","))
  .load()
{code}

Config of the sink:


{code:java}
.writeStream
.option("checkpointLocation", 
s"${config.checkpointDir}/${topicConfig._1}/")
.format("kafka")
.option("kafka.bootstrap.servers", config.bootstrapServers)
.option("kafka.connections.max.idle.ms", 360)
//SSL: this only applies to communication between Spark and Kafka 
brokers; you are still responsible for separately securing Spark inter-node 
communication.
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.start()
{code}

And in some cases it throws the exception making the spark job stuck in that 
step. Exception stack trace is the following:

{code:java}
17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.ap

[jira] [Created] (KAFKA-5651) KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5651:
-

 Summary: KIP-182: Reduce Streams DSL overloads and allow easier 
use of custom storage engines
 Key: KAFKA-5651
 URL: https://issues.apache.org/jira/browse/KAFKA-5651
 Project: Kafka
  Issue Type: New Feature
Reporter: Damian Guy
Assignee: Damian Guy






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5650) Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)

2017-07-26 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-5650:
--
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-5651

> Provide a simple way for custom storage engines to use streams wrapped stores 
> (KIP-182)
> ---
>
> Key: KAFKA-5650
> URL: https://issues.apache.org/jira/browse/KAFKA-5650
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> As per KIP-182:
> A new interface will be added:
> {code}
> /**
>  * Implementations of this will provide the ability to wrap a given StateStore
>  * with or without caching/loggging etc.
>  */
> public interface StateStoreBuilder {
>  
> StateStoreBuilder withCachingEnabled();
> StateStoreBuilder withCachingDisabled();
> StateStoreBuilder withLoggingEnabled(Map config);
> StateStoreBuilder withLoggingDisabled();
> T build();
> }
> {code}
> This interface will be used to wrap stores with caching, logging etc.
> Additionally some convenience methods on the {{Stores}} class:
> {code}
> public static  StateStoreSupplier> 
> persistentKeyValueStore(final String name,
>   
>final Serde keySerde,
>   
>final Serde valueSerde)
>  
> public static  StateStoreSupplier> 
> inMemoryKeyValueStore(final String name,
>   
>   final Serde keySerde,
>   
>   final Serde valueSerde)
>  
> public static  StateStoreSupplier> lruMap(final 
> String name,
> final int 
> capacity,
> final 
> Serde keySerde,
> final 
> Serde valueSerde)
>  
> public static  StateStoreSupplier> 
> persistentWindowStore(final String name,
> 
> final Windows windows,
> 
> final Serde keySerde,
> 
> final Serde valueSerde)
>  
> public static  StateStoreSupplier> 
> persistentSessionStore(final String name,
>   
> final SessionWindows windows,
>   
> final Serde keySerde,
>   
> final Serde valueSerde)
>  
> /**
>  *  The following methods are for use with the PAPI. They allow building of 
> StateStores that can be wrapped with
>  *  caching, logging, and any other convenient wrappers provided by the 
> KafkaStreams library
>  */ 
> public  StateStoreBuilder> windowStoreBuilder(final 
> StateStoreSupplier> supplier)
>  
> public  StateStoreBuilder> 
> keyValueStoreBuilder(final StateStoreSupplier> supplier)
>  
> public  StateStoreBuilder> sessionStoreBuilder(final 
> StateStoreSupplier> supplier)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5652) Add new api methods to KStream

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5652:
-

 Summary: Add new api methods to KStream
 Key: KAFKA-5652
 URL: https://issues.apache.org/jira/browse/KAFKA-5652
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy
Assignee: Damian Guy


Add new methods from KIP-182 to {{KStream}}
 until finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5653) Add new API methods to KTable

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5653:
-

 Summary: Add new API methods to KTable
 Key: KAFKA-5653
 URL: https://issues.apache.org/jira/browse/KAFKA-5653
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy


placeholder until API finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5654) Add new API methods to KGroupedStream

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5654:
-

 Summary: Add new API methods to KGroupedStream
 Key: KAFKA-5654
 URL: https://issues.apache.org/jira/browse/KAFKA-5654
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy
Assignee: Damian Guy


Placeholder until API finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5655) Add new API methods to KGroupedTable

2017-07-26 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-5655:
-

 Summary: Add new API methods to KGroupedTable
 Key: KAFKA-5655
 URL: https://issues.apache.org/jira/browse/KAFKA-5655
 Project: Kafka
  Issue Type: Sub-task
Reporter: Damian Guy


Placeholder until API finalized



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5656) Support bulk attributes request on KafkaMbean where some attributes do not exist

2017-07-26 Thread Erik Kringen (JIRA)
Erik Kringen created KAFKA-5656:
---

 Summary: Support bulk attributes request on KafkaMbean where some 
attributes do not exist
 Key: KAFKA-5656
 URL: https://issues.apache.org/jira/browse/KAFKA-5656
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Erik Kringen
Priority: Minor


According to Oracle documentation on [Implementing a Dynamic 
MBean|http://docs.oracle.com/cd/E19698-01/816-7609/6mdjrf83d/index.html] 

bq. The bulk getter and setter methods usually rely on the generic getter and 
setter, respectively. This makes them independent of the management interface, 
which can simplify certain modifications. In this case, their implementation 
consists mostly of error checking on the list of attributes. However, all bulk 
getters and setters must be implemented so that an error on any one attribute 
does not interrupt or invalidate the bulk operation on the other attributes.

bq. If an attribute cannot be read, then its name-value pair is not included in 
the list of results. If an attribute cannot be written, it will not be copied 
to the returned list of successful set operations. As a result, if there are 
any errors, the lists returned by bulk operators will not have the same length 
as the array or list passed to them. In any case, the bulk operators do not 
guarantee that their returned lists have the same ordering of attributes as the 
input array or list.

The current implementation of 
{code}org.apache.kafka.common.metrics.JmxReporter.KafkaMbean#getAttributes{code}
 returns an empty list if any of the the requested attributes are not found.

This method should instead log the exception but allow all requested attributes 
that are present to be returned, as prescribed via the DynamicMBean interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-07-26 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101932#comment-16101932
 ] 

Jiangjie Qin commented on KAFKA-5621:
-

[~ijuma] [~apurva] The expiration for messages in the accumulator was not for 
memory footprint control, but for making progress when a partition is stuck. 
For example, if the leader of a partition becomes unavailable for some reason, 
the records in the accumulator cannot be sent. Retry only makes sense when the 
producer can try. So we have to expire the records at some point when that 
partition cannot make progress, whether it is expired after request_timeout or 
retries * request_timeout could be discussed. But notice that some times client 
will set the retries to be Integer.MAX_VALUE. This will also result in 
unexpected behavior.

The reasons of having an explicit batch.expiry.ms are: 1) we have exposed the 
concept of bathing to the users through batch.size and linger.ms. So users 
should have already known the producer sends batches. No new concept is added. 
2) If a record has been sitting in the record accumulator for more than 
batch.expiry.ms, likely there is a very long queue or the producer cannot make 
progress. So users may want to get an exception and do something. And this 
expiration time is kind of an SLO and is not necessarily related to the 
request_timeout * retries which is intended for the remote call. So decoupling 
them would be useful.

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5656) Support bulk attributes request on KafkaMbean where some attributes do not exist

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101941#comment-16101941
 ] 

ASF GitHub Bot commented on KAFKA-5656:
---

GitHub user ErikKringen opened a pull request:

https://github.com/apache/kafka/pull/3582

KAFKA-5656: Support bulk attributes request on KafkaMbean where some 
Support bulk attributes request on KafkaMbean where some attributes do not exist



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ErikKringen/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3582.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3582


commit 0febcdd59cee9e1f34bdd9646aee59944c28386e
Author: Erik.Kringen 
Date:   2017-07-26T17:12:04Z

KAFKA-5656: Support bulk attributes request on KafkaMbean where some 
attributes do not exist




> Support bulk attributes request on KafkaMbean where some attributes do not 
> exist
> 
>
> Key: KAFKA-5656
> URL: https://issues.apache.org/jira/browse/KAFKA-5656
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Erik Kringen
>Priority: Minor
>
> According to Oracle documentation on [Implementing a Dynamic 
> MBean|http://docs.oracle.com/cd/E19698-01/816-7609/6mdjrf83d/index.html] 
> bq. The bulk getter and setter methods usually rely on the generic getter and 
> setter, respectively. This makes them independent of the management 
> interface, which can simplify certain modifications. In this case, their 
> implementation consists mostly of error checking on the list of attributes. 
> However, all bulk getters and setters must be implemented so that an error on 
> any one attribute does not interrupt or invalidate the bulk operation on the 
> other attributes.
> bq. If an attribute cannot be read, then its name-value pair is not included 
> in the list of results. If an attribute cannot be written, it will not be 
> copied to the returned list of successful set operations. As a result, if 
> there are any errors, the lists returned by bulk operators will not have the 
> same length as the array or list passed to them. In any case, the bulk 
> operators do not guarantee that their returned lists have the same ordering 
> of attributes as the input array or list.
> The current implementation of 
> {code}org.apache.kafka.common.metrics.JmxReporter.KafkaMbean#getAttributes{code}
>  returns an empty list if any of the the requested attributes are not found.
> This method should instead log the exception but allow all requested 
> attributes that are present to be returned, as prescribed via the 
> DynamicMBean interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101966#comment-16101966
 ] 

Ewen Cheslack-Postava commented on KAFKA-4327:
--

[~mjsax] We'd have to decide whether the java api for the command is considered 
public or just the sh script that executes it. If so we could move the tool but 
would want some deprecation period for the original w/ some logging about the 
deprecation. But otherwise I agree, the natural home for the tool is in streams.


> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5341) Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101975#comment-16101975
 ] 

ASF GitHub Bot commented on KAFKA-5341:
---

GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/3583

KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr 
metrics (KIP-164)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-5341

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3583


commit 93f541c249def6bdc158cb79592278d18cdf3ff8
Author: Dong Lin 
Date:   2017-05-28T08:10:28Z

KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr 
metrics (KIP-164)




> Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics
> ---
>
> Key: KAFKA-5341
> URL: https://issues.apache.org/jira/browse/KAFKA-5341
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> We currently have under replicated partitions, but we do not have a metric to 
> track the number of partitions whose in-sync replicas count < minIsr. 
> Partitions whose in-syn replicas count < minIsr will be unavailable to those 
> producers who uses ack = all. It is important for Kafka operators to be 
> notified of the existence of such partition because their existence reduces 
> the availability of the Kafka service.
> More specifically, we can define a per-broker metric 
> UnderMinIsrPartitionCount as "The number of partitions that this broker leads 
> for which in-sync replicas count < minIsr." So if the RF was 3, and min ISR 
> is 2, then when there are 2 replicas in ISR this partition would be in the 
> under replicated partitions count. When there is 1 replica in ISR, this 
> partition would also be in the UnderMinIsrPartitionCount.
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics
>  for more detail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-07-26 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102096#comment-16102096
 ] 

Apurva Mehta commented on KAFKA-5621:
-

> The expiration for messages in the accumulator was not for memory footprint 
> control, but for making progress when a partition is stuck

This is a fair point. Since we have one queue per partition, there is no real 
reason why a single stuck partition should affect the others. I think what 
happens today is that the next {{producer.send}} to the stuck partition will 
just block because there is no space remaining, and that affects everything 
else. From this point of view, expiring accumulator batches does make sense. 

Is that right? 

On your second point of the concept of a batch being exposed, you are right, of 
course. Tuning your producer batches is an important part of getting the best 
performance out of kafka. We have the {{linger.ms}} and {{batch.size}} configs 
to control this and these are fairly intuitive to use. As a user, you would 
think "If I have an expected X byte/s steady state throughput and I want 
batches of Y bytes, I should set linger to Zms to make sure I get my optimal 
batching".  

In this context, exposing the notion of a client side record accumulator with 
batch expiry _is_ a new concept in my opinion. 



> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5648) make Merger extend Aggregator

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102119#comment-16102119
 ] 

Matthias J. Sax commented on KAFKA-5648:


Your observation is correct, that {{Merger}} and {{Aggregator}} are similar. 
You also stated correctly, that the types are different though, as the 
{{Merger}} merges two aggregates of same type, while the Aggregator in general 
merged a single value (of type-A) merges the value into an aggregate (of 
type-B). Thus, {{Merger extends Aggregator make Merger extend Aggregator
> -
>
> Key: KAFKA-5648
> URL: https://issues.apache.org/jira/browse/KAFKA-5648
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Clemens Valiente
>Assignee: Clemens Valiente
>Priority: Minor
>
> Hi,
> I suggest that Merger should extend Aggregator.
> reason:
> Both classes usually do very similar things. A merger takes two sessions and 
> combines them, an aggregator takes an existing session and aggregates new 
> values into it.
> in some use cases it is actually the same thing, e.g.:
>  -> .map() to > -> 
> .groupByKey().aggregate() to >
> In this case both merger and aggregator do the same thing: take two lists and 
> combine them into one.
> With the proposed change we could pass the Merger as both the merger and 
> aggregator to the .aggregate() method and keep our business logic within one 
> merger class.
> Or in other words: The Merger is simply an Aggregator that happens to 
> aggregate two objects of the same class



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102134#comment-16102134
 ] 

Matthias J. Sax commented on KAFKA-4327:


It is definitely part of public API, and I am fine with keeping the current one 
as deprecated. On the other hand, it has annotation 
{{@InterfaceStability.Unstable}} and thus we can remove it without further 
deprecation, too. I cannot judge if people use the class from Java code or only 
use from command line. So I am fine either way (direct move or deprecating).

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102147#comment-16102147
 ] 

Ismael Juma commented on KAFKA-4327:


We discussed this at the time and we documented that it's _not_ part of public 
API so that we could move it to the right location:

{code}
This class is not part of public API. For backward compatibility, use 
the provided script in "bin/" instead of calling this class directly from your 
code.
{code}

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5657) Connect REST API should include the connector type when describing a connector

2017-07-26 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-5657:


 Summary: Connect REST API should include the connector type when 
describing a connector
 Key: KAFKA-5657
 URL: https://issues.apache.org/jira/browse/KAFKA-5657
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 0.11.0.0
Reporter: Randall Hauch
 Fix For: 1.0.0


Kafka Connect's REST API's {{connectors/}} and {{connectors/{name}}} endpoints 
should include whether the connector is a source or a sink.

See KAFKA-4343 and KIP-151 for the related modification of the 
{{connector-plugins}} endpoint.

Also see KAFKA-4279 for converter-related endpoints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4279) REST endpoint to list converter plugins

2017-07-26 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-4279:
-
Labels: needs-kip newbie  (was: )

> REST endpoint to list converter plugins
> ---
>
> Key: KAFKA-4279
> URL: https://issues.apache.org/jira/browse/KAFKA-4279
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We have a REST resource that allows users to see the available plugins, but 
> we have no equivalent that allows listing available converters.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5657) Connect REST API should include the connector type when describing a connector

2017-07-26 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-5657:
-
Description: 
Kafka Connect's REST API's {{connectors/}} and {{connectors/\{name\}}} 
endpoints should include whether the connector is a source or a sink.

See KAFKA-4343 and KIP-151 for the related modification of the 
{{connector-plugins}} endpoint.

Also see KAFKA-4279 for converter-related endpoints.

  was:
Kafka Connect's REST API's {{connectors/}} and {{connectors/{name}}} endpoints 
should include whether the connector is a source or a sink.

See KAFKA-4343 and KIP-151 for the related modification of the 
{{connector-plugins}} endpoint.

Also see KAFKA-4279 for converter-related endpoints.


> Connect REST API should include the connector type when describing a connector
> --
>
> Key: KAFKA-5657
> URL: https://issues.apache.org/jira/browse/KAFKA-5657
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>  Labels: needs-kip, newbie
> Fix For: 1.0.0
>
>
> Kafka Connect's REST API's {{connectors/}} and {{connectors/\{name\}}} 
> endpoints should include whether the connector is a source or a sink.
> See KAFKA-4343 and KIP-151 for the related modification of the 
> {{connector-plugins}} endpoint.
> Also see KAFKA-4279 for converter-related endpoints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-07-26 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102176#comment-16102176
 ] 

Jiangjie Qin commented on KAFKA-5621:
-

[~apurva] Yes, I agree that expiry ms is a new concept as it is an additional 
thing users may want to think, i.e. "If I have a partition unavailable 
temporarily, how long am I willing to wait for it to come back?" Arguably this 
can also be derived from request timeout and retries. But the difference here 
is that those two configs are primarily for other cases, and in practice we 
found it is quite tricky (if possible) to get them right for the batch 
expiration.

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change

2017-07-26 Thread Per Steffensen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102200#comment-16102200
 ] 

Per Steffensen commented on KAFKA-5505:
---

bq. There's been some discussion about more incremental rebalancing, but as you 
add/remove tasks, there's no way to avoid the fact that to keep the work 
balanced we may need to stop/start/move some tasks

I can handle restart of tasks! It is just a significant overhead if it happens 
all the time - and it does, as long a all tasks are restarted every time the 
set of tasks changes. It will not be a problem if it happens "from time to 
time" due to rebalance. What annoys me most is actually that the connector 
itself is restarted, when the set of tasks changes - there is no good reason 
for that at all, as I see it? The problem is that it takes some time before my 
connector can builds up its set of tasks, after is (re)starts, because it has 
to talk with other components to get the entire set of tasks. But the connector 
has to give a set of tasks almost immediately after (re)start, or things will 
start behaving strange. Therefore my connector has to start out saying that its 
set of tasks is empty, and then change the set of tasks (calling 
context.requestTaskReconfiguration) along the way, as it knows about more and 
more tasks. But when it does so, the connector is restarted itself, and starts 
over with an empty set of tasks. I makes the process go: connector started -> 
empty set of tasks -> some tasks -> connector restarted -> empty set of tasks 
-> some tasks -> connector restarted -> ... I really have to hack to make it 
work.
If we could just make a change where the connector is not restarted, when it 
changes its set of tasks, it will be a big step.

bq. Can you explain why you have task sets changing so frequently?

Ohhh, it is a fairly long explanation in my case. But in general I do not have 
a hard time imagining connectors with a changing set of tasks. I believe you 
already have a source-connector out-of-the-box that can copy from a relational 
database table. Imagine that you would like to extend it, to be able to copy 
all tables of that database, running one task per table. Guess that would be a 
fairly reasonable extension. If the set of tables change often, the set of 
tasks of this connector would change often.

bq. It's possible that a different way of assigning partitions to tasks might 
avoid rebalancing all the time.

Well I did that for now. Actually I changed it so that I always have exactly 
one task, and inside that single task, I handle all the stuff that would 
otherwise be distributed between tasks. My single task, runs one thread per 
"partition in the source" - basically one thread where I would like to have had 
one task. It works the same, but it will not scale, because one task has to run 
on one machine. Being able to split into several tasks, will help scale. One 
machine will definitely be able to handle one "partition in the source", but it 
may not be able to handle "all partitions in the source".
I could also take this principle and scale to another fixed number (N) of 
tasks, higher than one. Then task no M (M from 0 to N-1) will handle 
"partitions in the source" P where hash(id-of-P) modulo N is M.

So I have ways around the problem, but I think the requested change would be 
nice in general, and something people will expect to be available, especially 
since it is possible to change the set of tasks along the way - I know I was 
surprised that it did not already work as I request here.

> Connect: Do not restart connector and existing tasks on task-set change
> ---
>
> Key: KAFKA-5505
> URL: https://issues.apache.org/jira/browse/KAFKA-5505
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Per Steffensen
>
> I am writing a connector with a frequently changing task-set. It is really 
> not working very well, because the connector and all existing tasks are 
> restarted when the set of tasks changes. E.g. if the connector is running 
> with 10 tasks, and an additional task is needed, the connector itself and all 
> 10 existing tasks are restarted, just to make the 11th task run also. My 
> tasks have a fairly heavy initialization, making it extra annoying. I would 
> like to see a change, introducing a "mode", where only new/deleted tasks are 
> started/stopped when notifying the system that the set of tasks changed 
> (calling context.requestTaskReconfiguration() - or something similar).
> Discussed this issue a little on d...@kafka.apache.org in the thread "Kafka 
> Connect: To much restarting with a SourceConnector with dynamic set of tasks"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread dan norwood (JIRA)
dan norwood created KAFKA-5658:
--

 Summary: adminclient will stop working after some amount of time
 Key: KAFKA-5658
 URL: https://issues.apache.org/jira/browse/KAFKA-5658
 Project: Kafka
  Issue Type: Bug
Reporter: dan norwood


if i create an admin client and let it sit unused for some amount of time, then 
attempt to use it i will get the following 

{noformat}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.BrokerNotAvailableException
{noformat}

even though the broker is up. if before each usage of adminclient i create a 
new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5658:
---
Labels: reliability  (was: )

> adminclient will stop working after some amount of time
> ---
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5658:
---
Priority: Critical  (was: Major)

> adminclient will stop working after some amount of time
> ---
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5658:
---
Fix Version/s: 0.11.0.1

> adminclient will stop working after some amount of time
> ---
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5658) adminclient will stop working after some amount of time

2017-07-26 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-5658:
--

Assignee: Colin P. McCabe

> adminclient will stop working after some amount of time
> ---
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Assignee: Colin P. McCabe
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions

2017-07-26 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-5658:
---
Summary: Fix AdminClient request timeout handling bug resulting in 
continual BrokerNotAvailableExceptions  (was: adminclient will stop working 
after some amount of time)

> Fix AdminClient request timeout handling bug resulting in continual 
> BrokerNotAvailableExceptions
> 
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Assignee: Colin P. McCabe
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> if i create an admin client and let it sit unused for some amount of time, 
> then attempt to use it i will get the following 
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException
> {noformat}
> even though the broker is up. if before each usage of adminclient i create a 
> new admin client i do not see the same behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions

2017-07-26 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-5658:
---
Description: The AdminClient does not properly clear calls from the 
callsInFlight structure.  Later, in an effort to clear the lingering call 
objects, it closes the connection they are associated with.  This disrupts new 
incoming calls, which then get {{java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.BrokerNotAvailableException}}.  (was: if i 
create an admin client and let it sit unused for some amount of time, then 
attempt to use it i will get the following 

{noformat}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.BrokerNotAvailableException
{noformat}

even though the broker is up. if before each usage of adminclient i create a 
new admin client i do not see the same behavior.)

> Fix AdminClient request timeout handling bug resulting in continual 
> BrokerNotAvailableExceptions
> 
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Assignee: Colin P. McCabe
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> The AdminClient does not properly clear calls from the callsInFlight 
> structure.  Later, in an effort to clear the lingering call objects, it 
> closes the connection they are associated with.  This disrupts new incoming 
> calls, which then get {{java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102290#comment-16102290
 ] 

ASF GitHub Bot commented on KAFKA-5658:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/3584

KAFKA-5658. Fix AdminClient request timeout handling bug resulting in 
continual BrokerNotAvailableExceptions

The AdminClient does not properly clear calls from the callsInFlight 
structure. Later, in an effort to clear the lingering call objects, it closes 
the connection they are associated with. This disrupts new incoming calls, 
which then get BrokerNotAvailableException.

This patch fixes this bug by properly removing completed calls from the 
callsInFlight structure.  It also adds the Call#aborted flag, which ensures 
that we only abort a connection once-- even if there is a similar bug in the 
future which causes old Call objects to linger.  This patch also fixes a case 
where AdminClient#describeConfigs was making an extra RPC that had no useful 
effect.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5658

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3584.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3584


commit 7eedf51d2b29565460f04f78435f2bdf5a9cd661
Author: Colin P. Mccabe 
Date:   2017-07-17T17:04:58Z

KAFKA-5602: ducker-ak: support --custom-ducktape

Support a --custom-ducktape flag which allows developers to install
their own versions of ducktape into Docker images.  This is helpful for
ducktape development.

commit 811983f02cb1ff887bbe75ffc22ef51f98a99a36
Author: Colin P. Mccabe 
Date:   2017-07-26T20:57:18Z

KAFKA-5658. Fix AdminClient request timeout handling bug resulting in 
continual BrokerNotAvailableExceptions




> Fix AdminClient request timeout handling bug resulting in continual 
> BrokerNotAvailableExceptions
> 
>
> Key: KAFKA-5658
> URL: https://issues.apache.org/jira/browse/KAFKA-5658
> Project: Kafka
>  Issue Type: Bug
>Reporter: dan norwood
>Assignee: Colin P. McCabe
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> The AdminClient does not properly clear calls from the callsInFlight 
> structure.  Later, in an effort to clear the lingering call objects, it 
> closes the connection they are associated with.  This disrupts new incoming 
> calls, which then get {{java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.BrokerNotAvailableException}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5659) AdminClient#describeConfigs makes an extra empty request when only broker info is requested

2017-07-26 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-5659:
--

 Summary: AdminClient#describeConfigs makes an extra empty request 
when only broker info is requested
 Key: KAFKA-5659
 URL: https://issues.apache.org/jira/browse/KAFKA-5659
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


AdminClient#describeConfigs makes an extra empty request when only broker info 
is requested



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5659) AdminClient#describeConfigs makes an extra empty request when only broker info is requested

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102309#comment-16102309
 ] 

ASF GitHub Bot commented on KAFKA-5659:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/3585

KAFKA-5659. AdminClient#describeConfigs makes an extra empty request …

…when only broker info is requested

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-5659

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3585.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3585


commit 593ed34ce08c19a34f2146999f9e07a02927ca61
Author: Colin P. Mccabe 
Date:   2017-07-26T21:20:12Z

KAFKA-5659. AdminClient#describeConfigs makes an extra empty request when 
only broker info is requested




> AdminClient#describeConfigs makes an extra empty request when only broker 
> info is requested
> ---
>
> Key: KAFKA-5659
> URL: https://issues.apache.org/jira/browse/KAFKA-5659
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> AdminClient#describeConfigs makes an extra empty request when only broker 
> info is requested



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2017-07-26 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5660:
--

 Summary: Don't throw TopologyBuilderException during runtime
 Key: KAFKA-5660
 URL: https://issues.apache.org/jira/browse/KAFKA-5660
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Matthias J. Sax


{{TopologyBuilderException}} is a pre-runtime exception that should only be 
thrown {{KafkaStreams#start()}} is called.

However, we do throw {{TopologyBuilderException}} within

- `SourceNodeFactory#getTopics`
- `ProcessorContextImpl#getStateStore`

(and maybe somewhere else: we should double check if there are other places in 
the code like those).

We should replace those exception with either {{StreamsException}} or with a 
new exception type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2017-07-26 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102313#comment-16102313
 ] 

Apurva Mehta commented on KAFKA-5621:
-

I think the core dichotomy is that we have mirror-maker-like use cases and 
application use cases.
 
In the mirror maker use case, each partition is truly independent. If a subset 
of partitions are down, we still want to process the rest. So we want to expire 
batches and raise errors to the application (mirror maker in this case) as soon 
as possible. 

On the other hand, for an application, partitions are not really independent 
(and especially so if you use transactions). If one partition is down, it makes 
sense to wait for it to be ready before continuing. So we would want to handle 
as many errors internally as possible. It would mean blocking sends once the 
queue is too large and not expiring batches in the queue. This simplifies the 
application programming model. 

I think we should optimize the defaults for applications, but yet enable tools 
like mirror maker to get the desired behavior by setting the right configs.

Assuming that the we complete [KAFKA-5494], we could apply retries to expired 
batches only when the idempotent producer is enabled. This way the default 
behavior is the simplest one for the application. 

KMM and other such tools could continue to use the producer without idempotence 
enabled and keep the existing behavior. Of course, if we get into the same 
quandary if KMM wants to enable idempotence, but this is the best compromise 
without introducing an additional config. 

Another option is to introduce the 'queue.time.ms' config. The default would be 
infinite. When it is specified, we would not retry expired batches regardless 
of whether idempotence is enabled. So KMM like tooling could specify a value 
and most application developers could ignore it. 

I am not a fan of introducing new configs for a very narrow use case though, so 
I will continue to think of more alternatives.

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
> Fix For: 1.0.0
>
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5661) Develop and understanding of how to tune transactions for optimal performance

2017-07-26 Thread Apurva Mehta (JIRA)
Apurva Mehta created KAFKA-5661:
---

 Summary: Develop and understanding of how to tune transactions for 
optimal performance
 Key: KAFKA-5661
 URL: https://issues.apache.org/jira/browse/KAFKA-5661
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 0.11.0.0
Reporter: Apurva Mehta
Assignee: Apurva Mehta
 Fix For: 1.0.0


Currently, we don't have an idea of the throughput curve for transactions 
across a different range of workloads. 

Thus we would like to understand how to tune transactions so that they are 
viable across a broad range of work loads. For instance, what knobs can you 
tweak if you use small messages to yet get acceptable transactional 
performance? We don't understand the performance curve across variables like 
message size, batch size, transaction duration, linger.ms, etc., and it would 
be good to get an understanding of this area and publish recommended 
configurations for different workloads.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5662) We should be able to specify min.insync.replicas for the __consumer_offsets topic

2017-07-26 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-5662:
---
Labels: needs-kip  (was: )

> We should be able to specify min.insync.replicas for the __consumer_offsets 
> topic
> -
>
> Key: KAFKA-5662
> URL: https://issues.apache.org/jira/browse/KAFKA-5662
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>  Labels: needs-kip
>
> The transaction log has a {{transaction.state.log.min.isr}} setting to 
> control the min.isr for the transaction log (by default the min.isr is 2 and 
> replication.factor is 3).
> Unfortunately, we don't have a similar setting for the offsets topic. We 
> should add the following {{offsets.topic.min.isr}} setting and default that 
> to 2 so that we have durability on the offsets topic. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5662) We should be able to specify min.insync.replicas for the __consumer_offsets topic

2017-07-26 Thread Apurva Mehta (JIRA)
Apurva Mehta created KAFKA-5662:
---

 Summary: We should be able to specify min.insync.replicas for the 
__consumer_offsets topic
 Key: KAFKA-5662
 URL: https://issues.apache.org/jira/browse/KAFKA-5662
 Project: Kafka
  Issue Type: Bug
Reporter: Apurva Mehta


The transaction log has a {{transaction.state.log.min.isr}} setting to control 
the min.isr for the transaction log (by default the min.isr is 2 and 
replication.factor is 3).

Unfortunately, we don't have a similar setting for the offsets topic. We should 
add the following {{offsets.topic.min.isr}} setting and default that to 2 so 
that we have durability on the offsets topic. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102357#comment-16102357
 ] 

Matthias J. Sax commented on KAFKA-4327:


Thanks [~ijuma]! This clarifies it.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5663) LogDirFailureTest system test fails

2017-07-26 Thread Apurva Mehta (JIRA)
Apurva Mehta created KAFKA-5663:
---

 Summary: LogDirFailureTest system test fails
 Key: KAFKA-5663
 URL: https://issues.apache.org/jira/browse/KAFKA-5663
 Project: Kafka
  Issue Type: Bug
Reporter: Apurva Mehta
Assignee: Dong Lin


The recently added JBOD system test failed last night.

{noformat}
Producer failed to produce messages for 20s.
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 123, in run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 176, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
 line 321, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py",
 line 166, in test_replication_with_disk_failure
self.start_producer_and_consumer()
  File 
"/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 75, in start_producer_and_consumer
self.producer_start_timeout_sec)
  File 
"/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
 line 36, in wait_until
raise TimeoutError(err_msg)
TimeoutError: Producer failed to produce messages for 20s.
{noformat}

Complete logs here:

http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5663) LogDirFailureTest system test fails

2017-07-26 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102370#comment-16102370
 ] 

Apurva Mehta commented on KAFKA-5663:
-

[~lindong] I assigned this to you since you added this test recently. Can you 
please take a look?

> LogDirFailureTest system test fails
> ---
>
> Key: KAFKA-5663
> URL: https://issues.apache.org/jira/browse/KAFKA-5663
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Dong Lin
>
> The recently added JBOD system test failed last night.
> {noformat}
> Producer failed to produce messages for 20s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py",
>  line 166, in test_replication_with_disk_failure
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 75, in start_producer_and_consumer
> self.producer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Producer failed to produce messages for 20s.
> {noformat}
> Complete logs here:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2017-07-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102394#comment-16102394
 ] 

Guozhang Wang commented on KAFKA-4327:
--

+1 to what [~ijuma] said.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5664) Disable auto offset commit in ConsoleConsumer if no group is provided

2017-07-26 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5664:
--

 Summary: Disable auto offset commit in ConsoleConsumer if no group 
is provided
 Key: KAFKA-5664
 URL: https://issues.apache.org/jira/browse/KAFKA-5664
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


In ConsoleCosnumer, if no group is provided, we generate a random groupId:
{code}
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new 
Random().nextInt(10)}")
{code}
In this case, since the group is not likely to be used again, we should disable 
automatic offset commits. This avoids polluting the coordinator cache with 
offsets that will never be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5663) LogDirFailureTest system test fails

2017-07-26 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102403#comment-16102403
 ] 

Dong Lin commented on KAFKA-5663:
-

Thanks [~apurva]. I will look into this.

> LogDirFailureTest system test fails
> ---
>
> Key: KAFKA-5663
> URL: https://issues.apache.org/jira/browse/KAFKA-5663
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Dong Lin
>
> The recently added JBOD system test failed last night.
> {noformat}
> Producer failed to produce messages for 20s.
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py",
>  line 166, in test_replication_with_disk_failure
> self.start_producer_and_consumer()
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 75, in start_producer_and_consumer
> self.producer_start_timeout_sec)
>   File 
> "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Producer failed to produce messages for 20s.
> {noformat}
> Complete logs here:
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5664) Disable auto offset commit in ConsoleConsumer if no group is provided

2017-07-26 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-5664:
--

Assignee: Vahid Hashemian

> Disable auto offset commit in ConsoleConsumer if no group is provided
> -
>
> Key: KAFKA-5664
> URL: https://issues.apache.org/jira/browse/KAFKA-5664
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>
> In ConsoleCosnumer, if no group is provided, we generate a random groupId:
> {code}
> consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new 
> Random().nextInt(10)}")
> {code}
> In this case, since the group is not likely to be used again, we should 
> disable automatic offset commits. This avoids polluting the coordinator cache 
> with offsets that will never be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5007) Kafka Replica Fetcher Thread- Resource Leak

2017-07-26 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102622#comment-16102622
 ] 

huxihx commented on KAFKA-5007:
---

[~joseph.alias...@gmail.com] what's the status for this jira? Have you 
confirmed that it's the reason? If yes, I could work on it to fix.

> Kafka Replica Fetcher Thread- Resource Leak
> ---
>
> Key: KAFKA-5007
> URL: https://issues.apache.org/jira/browse/KAFKA-5007
> Project: Kafka
>  Issue Type: Bug
>  Components: core, network
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.10.2.0
> Environment: Centos 7
> Jave 8
>Reporter: Joseph Aliase
>Priority: Critical
>  Labels: reliability
> Attachments: jstack-kafka.out, jstack-zoo.out, lsofkafka.txt, 
> lsofzookeeper.txt
>
>
> Kafka is running out of open file descriptor when system network interface is 
> done.
> Issue description:
> We have a Kafka Cluster of 5 node running on version 0.10.1.1. The open file 
> descriptor for the account running Kafka is set to 10.
> During an upgrade, network interface went down. Outage continued for 12 hours 
> eventually all the broker crashed with java.io.IOException: Too many open 
> files error.
> We repeated the test in a lower environment and observed that Open Socket 
> count keeps on increasing while the NIC is down.
> We have around 13 topics with max partition size of 120 and number of replica 
> fetcher thread is set to 8.
> Using an internal monitoring tool we observed that Open Socket descriptor   
> for the broker pid continued to increase although NIC was down leading to  
> Open File descriptor error. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread

2017-07-26 Thread huxihx (JIRA)
huxihx created KAFKA-5665:
-

 Summary: Incorrect interruption invoking method used for Heartbeat 
thread 
 Key: KAFKA-5665
 URL: https://issues.apache.org/jira/browse/KAFKA-5665
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.11.0.0
Reporter: huxihx
Assignee: huxihx
Priority: Minor


When interrupting the background heartbeat thread, `Thread.interrupted();` is 
used. Actually, `Thread.currentThread().interrupt();` should be used to restore 
the interruption status. An alternative way to solve is to remove 
`Thread.interrupted();` since HeartbeatThread extends Thread and all code 
higher up on the call stack is controlled, so we could safely swallow this 
exception. Anyway, `Thread.interrupted();`  should not be used here. It's a 
test method not an action.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102661#comment-16102661
 ] 

ASF GitHub Bot commented on KAFKA-5665:
---

GitHub user huxihx opened a pull request:

https://github.com/apache/kafka/pull/3586

KAFKA-5665: Heartbeat thread should use correct interruption method to 
restore status

When interrupting the background heartbeat thread, `Thread.interrupted();` 
is used. Actually, `Thread.currentThread().interrupt();` should be used to 
restore the interruption status. An alternative way to solve is to remove 
`Thread.interrupted();` since HeartbeatThread extends Thread and all code 
higher up on the call stack is controlled, so we could safely swallow this 
exception. Anyway, `Thread.interrupted();` should not be used here. It's a test 
method not an action.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huxihx/kafka KAFKA-5665

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3586


commit 36d489eede2229db92eda077ae4baff80044fb25
Author: huxihx 
Date:   2017-07-27T03:53:21Z

KAFKA-5665: Incorrect interruption invoking method used for Heartbeat thread

When interrupting the background heartbeat thread, `Thread.interrupted();` 
is used. Actually, `Thread.currentThread().interrupt();` should be used to 
restore the interruption status. An alternative way to solve is to remove 
`Thread.interrupted();` since HeartbeatThread extends Thread and all code 
higher up on the call stack is controlled, so we could safely swallow this 
exception. Anyway, `Thread.interrupted();` should not be used here. It's a test 
method not an action.




> Incorrect interruption invoking method used for Heartbeat thread 
> -
>
> Key: KAFKA-5665
> URL: https://issues.apache.org/jira/browse/KAFKA-5665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When interrupting the background heartbeat thread, `Thread.interrupted();` is 
> used. Actually, `Thread.currentThread().interrupt();` should be used to 
> restore the interruption status. An alternative way to solve is to remove 
> `Thread.interrupted();` since HeartbeatThread extends Thread and all code 
> higher up on the call stack is controlled, so we could safely swallow this 
> exception. Anyway, `Thread.interrupted();`  should not be used here. It's a 
> test method not an action.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5341) Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics

2017-07-26 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-5341:
-
Fix Version/s: 1.0.0

> Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics
> ---
>
> Key: KAFKA-5341
> URL: https://issues.apache.org/jira/browse/KAFKA-5341
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Dong Lin
>Assignee: Dong Lin
> Fix For: 1.0.0
>
>
> We currently have under replicated partitions, but we do not have a metric to 
> track the number of partitions whose in-sync replicas count < minIsr. 
> Partitions whose in-syn replicas count < minIsr will be unavailable to those 
> producers who uses ack = all. It is important for Kafka operators to be 
> notified of the existence of such partition because their existence reduces 
> the availability of the Kafka service.
> More specifically, we can define a per-broker metric 
> UnderMinIsrPartitionCount as "The number of partitions that this broker leads 
> for which in-sync replicas count < minIsr." So if the RF was 3, and min ISR 
> is 2, then when there are 2 replicas in ISR this partition would be in the 
> under replicated partitions count. When there is 1 replica in ISR, this 
> partition would also be in the UnderMinIsrPartitionCount.
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics
>  for more detail.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102758#comment-16102758
 ] 

ASF GitHub Bot commented on KAFKA-5611:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3571


> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
>Assignee: Jason Gustafson
>  Labels: reliability
> Fix For: 0.11.0.1
>
> Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, 
> Server 2, Server 3
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2017-07-26 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102759#comment-16102759
 ] 

Jason Gustafson commented on KAFKA-5611:


[~pskianis] We have merged the patch above. We would appreciate if you could 
confirm whether or not it fixes the issue. 

> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
>Assignee: Jason Gustafson
>  Labels: reliability
> Fix For: 0.11.0.1
>
> Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, 
> Server 2, Server 3
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)