[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection
[ https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101249#comment-16101249 ] huxihx commented on KAFKA-5641: --- [~ijuma] "The broker only processes one request at a time, so even if you send the metadata request, it won't be processed until the processing of the previous request is completed" Seems this does not apply for METADATA request since sender thread only drains ProducerBatch and ensures the ordering by muting all batches from the same partitions before sending them out. In my opinion, NetworkClient could always process Metatadata request. The idea behind this jira is to say we could not honor `max.in.flight.requests.per.connection` when updating the metadata. That's to say, create a new method `canSendMoreNonProduceRequest` which does not care queue size. Do I make myself clear? > Metadata request should always be allowed to send no regardless of value for > max.in.flight.requests.per.connection > -- > > Key: KAFKA-5641 > URL: https://issues.apache.org/jira/browse/KAFKA-5641 > Project: Kafka > Issue Type: Improvement > Components: network, producer >Affects Versions: 0.11.0.0 >Reporter: huxihx > > Metadata request might not be able to be sent when > `max.in.flight.requests.per.connection` is set to 1 and there is already an > inflight request in the same node's queue, as show below: > {code:title=NetworkClient.java|borderStyle=solid} > private long maybeUpdate(long now, Node node) { > String nodeConnectionId = node.idString(); > if (canSendRequest(nodeConnectionId)) { > .. > } > {code} > However, setting `max.in.flight.requests.per.connection` to 1 actually means > no out-of-order for the produced records, Metadata requests should have no > related with this config. We don't have to check the inflight request's queue > size when sending Metadata request. > [~ijuma] Does it make any sense? If yes, I will work on it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5501: Summary: introduce async ZookeeperClient (was: use async zookeeper apis everywhere) > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper writes means that we wait an entire round trip before > doing the next write. These synchronous writes are happening at a > per-partition granularity in several places, so partition-heavy clusters > suffer from the controller doing many sequential round trips to zookeeper. > * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in > zookeeper on transition to OnlinePartition. This gets triggered per-partition > sequentially with synchronous writes during controlled shutdown of the > shutting down broker's replicas for which it is the leader. > * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to > OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets > triggered per-partition sequentially with synchronous writes for failed or > controlled shutdown brokers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere
Onur Karaman created KAFKA-5642: --- Summary: use async ZookeeperClient everywhere Key: KAFKA-5642 URL: https://issues.apache.org/jira/browse/KAFKA-5642 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers. KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined requests to zookeeper. We should replace ZkClient's usage with this client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5501: Description: Synchronous zookeeper apis means that we wait an entire round trip before doing the next operation. We should introduce a zookeeper client that encourages pipelined requests to zookeeper. (was: Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers.) > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5501. - Resolution: Fixed > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101268#comment-16101268 ] Onur Karaman commented on KAFKA-5501: - [~ijuma] I went ahead and reworded this ticket to be about making the client and KAFKA-5642 to be about using the client. With that, I went ahead and closed this ticket. > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection
[ https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101273#comment-16101273 ] Ismael Juma commented on KAFKA-5641: My comment was about the broker. And yes, the broker applies head of line blocking so that it processes one request at a time. If you send multiple requests, all but one are kept in the socket buffer. There is logic for this in the Selector. > Metadata request should always be allowed to send no regardless of value for > max.in.flight.requests.per.connection > -- > > Key: KAFKA-5641 > URL: https://issues.apache.org/jira/browse/KAFKA-5641 > Project: Kafka > Issue Type: Improvement > Components: network, producer >Affects Versions: 0.11.0.0 >Reporter: huxihx > > Metadata request might not be able to be sent when > `max.in.flight.requests.per.connection` is set to 1 and there is already an > inflight request in the same node's queue, as show below: > {code:title=NetworkClient.java|borderStyle=solid} > private long maybeUpdate(long now, Node node) { > String nodeConnectionId = node.idString(); > if (canSendRequest(nodeConnectionId)) { > .. > } > {code} > However, setting `max.in.flight.requests.per.connection` to 1 actually means > no out-of-order for the produced records, Metadata requests should have no > related with this config. We don't have to check the inflight request's queue > size when sending Metadata request. > [~ijuma] Does it make any sense? If yes, I will work on it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5641) Metadata request should always be allowed to send no regardless of value for max.in.flight.requests.per.connection
[ https://issues.apache.org/jira/browse/KAFKA-5641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101273#comment-16101273 ] Ismael Juma edited comment on KAFKA-5641 at 7/26/17 7:27 AM: - My comment was about the broker. And yes, the broker applies head of line blocking so that it processes one request at a time for a given connection. If you send multiple requests, all but one are kept in the socket buffer. There is logic for this in the Selector. was (Author: ijuma): My comment was about the broker. And yes, the broker applies head of line blocking so that it processes one request at a time. If you send multiple requests, all but one are kept in the socket buffer. There is logic for this in the Selector. > Metadata request should always be allowed to send no regardless of value for > max.in.flight.requests.per.connection > -- > > Key: KAFKA-5641 > URL: https://issues.apache.org/jira/browse/KAFKA-5641 > Project: Kafka > Issue Type: Improvement > Components: network, producer >Affects Versions: 0.11.0.0 >Reporter: huxihx > > Metadata request might not be able to be sent when > `max.in.flight.requests.per.connection` is set to 1 and there is already an > inflight request in the same node's queue, as show below: > {code:title=NetworkClient.java|borderStyle=solid} > private long maybeUpdate(long now, Node node) { > String nodeConnectionId = node.idString(); > if (canSendRequest(nodeConnectionId)) { > .. > } > {code} > However, setting `max.in.flight.requests.per.connection` to 1 actually means > no out-of-order for the produced records, Metadata requests should have no > related with this config. We don't have to check the inflight request's queue > size when sending Metadata request. > [~ijuma] Does it make any sense? If yes, I will work on it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5562) Do streams state directory cleanup on a single thread
[ https://issues.apache.org/jira/browse/KAFKA-5562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101401#comment-16101401 ] ASF GitHub Bot commented on KAFKA-5562: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3516 > Do streams state directory cleanup on a single thread > - > > Key: KAFKA-5562 > URL: https://issues.apache.org/jira/browse/KAFKA-5562 > Project: Kafka > Issue Type: Bug >Reporter: Damian Guy >Assignee: Damian Guy > > Currently in streams we clean up old state directories every so often (as > defined by {{state.cleanup.delay.ms}}). However, every StreamThread runs the > cleanup, which is both unnecessary and can potentially lead to race > conditions. > It would be better to perform the state cleanup on a single thread and only > when the {{KafkaStreams}} instance is in a running state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5643) Using _DUCKTAPE_OPTIONS has no effect on executing tests
Paolo Patierno created KAFKA-5643: - Summary: Using _DUCKTAPE_OPTIONS has no effect on executing tests Key: KAFKA-5643 URL: https://issues.apache.org/jira/browse/KAFKA-5643 Project: Kafka Issue Type: Bug Components: system tests Reporter: Paolo Patierno Assignee: Paolo Patierno Hi, as described in the documentation, you should be able to enable debugging using the following line : _DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee debug_logs.txt Instead the _DUCKTAPE_OPTIONS isn't available in the run_tests.sh script so it's not passed to the ducker-ak and finally on the ducktape command line. Thanks, Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5643) Using _DUCKTAPE_OPTIONS has no effect on executing tests
[ https://issues.apache.org/jira/browse/KAFKA-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101465#comment-16101465 ] ASF GitHub Bot commented on KAFKA-5643: --- GitHub user ppatierno opened a pull request: https://github.com/apache/kafka/pull/3578 KAFKA-5643: Using _DUCKTAPE_OPTIONS has no effect on executing tests Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppatierno/kafka kafka-5643 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3578.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3578 commit 02e958a1ee7cdd5b7e81fcec45fba7326a4ac9fa Author: Paolo Patierno Date: 2017-07-26T09:52:52Z Added handling of _DUCKTAPE_OPTIONS (mainly for enabling debugging) > Using _DUCKTAPE_OPTIONS has no effect on executing tests > > > Key: KAFKA-5643 > URL: https://issues.apache.org/jira/browse/KAFKA-5643 > Project: Kafka > Issue Type: Bug > Components: system tests >Reporter: Paolo Patierno >Assignee: Paolo Patierno > > Hi, > as described in the documentation, you should be able to enable debugging > using the following line : > _DUCKTAPE_OPTIONS="--debug" bash tests/docker/run_tests.sh | tee > debug_logs.txt > Instead the _DUCKTAPE_OPTIONS isn't available in the run_tests.sh script so > it's not passed to the ducker-ak and finally on the ducktape command line. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5644) Transient test failure: ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime
Manikumar created KAFKA-5644: Summary: Transient test failure: ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime Key: KAFKA-5644 URL: https://issues.apache.org/jira/browse/KAFKA-5644 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0 Reporter: Manikumar Priority: Minor {quote} unit.kafka.admin.ResetConsumerGroupOffsetTest > testResetOffsetsToZonedDateTime FAILED java.lang.AssertionError: Expected the consumer group to reset to when offset was 50. at kafka.utils.TestUtils$.fail(TestUtils.scala:339) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:853) at unit.kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime(ResetConsumerGroupOffsetTest.scala:188) {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5592) Connection with plain client to SSL-secured broker causes OOM
[ https://issues.apache.org/jira/browse/KAFKA-5592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101565#comment-16101565 ] Marcin Łuczyński commented on KAFKA-5592: - It's very likely. > Connection with plain client to SSL-secured broker causes OOM > - > > Key: KAFKA-5592 > URL: https://issues.apache.org/jira/browse/KAFKA-5592 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.11.0.0 > Environment: Linux x86_64 x86_64 x86_64 GNU/Linux >Reporter: Marcin Łuczyński > Attachments: heapdump.20170713.100129.14207.0002.phd, Heap.PNG, > javacore.20170713.100129.14207.0003.txt, Snap.20170713.100129.14207.0004.trc, > Stack.PNG > > > While testing connection with client app that does not have configured > truststore with a Kafka broker secured by SSL, my JVM crashes with > OutOfMemoryError. I saw it mixed with StackOverfowError. I attach dump files. > The stack trace to start with is here: > {quote}at java/nio/HeapByteBuffer. (HeapByteBuffer.java:57) > at java/nio/ByteBuffer.allocate(ByteBuffer.java:331) > at > org/apache/kafka/common/network/NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) > > at > org/apache/kafka/common/network/NetworkReceive.readFrom(NetworkReceive.java:71) > > at > org/apache/kafka/common/network/KafkaChannel.receive(KafkaChannel.java:169) > at org/apache/kafka/common/network/KafkaChannel.read(KafkaChannel.java:150) > at > org/apache/kafka/common/network/Selector.pollSelectionKeys(Selector.java:355) > at org/apache/kafka/common/network/Selector.poll(Selector.java:303) > at org/apache/kafka/clients/NetworkClient.poll(NetworkClient.java:349) > at > org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > > at > org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.poll(ConsumerNetworkClient.java:188) > > at > org/apache/kafka/clients/consumer/internals/AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:207) > > at > org/apache/kafka/clients/consumer/internals/AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) > > at > org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.poll(ConsumerCoordinator.java:279) > > at > org/apache/kafka/clients/consumer/KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > > at > org/apache/kafka/clients/consumer/KafkaConsumer.poll(KafkaConsumer.java:995) > at > com/ibm/is/cc/kafka/runtime/KafkaConsumerProcessor.process(KafkaConsumerProcessor.java:237) > > at > com/ibm/is/cc/kafka/runtime/KafkaProcessor.process(KafkaProcessor.java:173) > at > com/ibm/is/cc/javastage/connector/CC_JavaAdapter.run(CC_JavaAdapter.java:443){quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic
[ https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101652#comment-16101652 ] Matthias J. Sax commented on KAFKA-5386: Understood. Technically, it would be possible to allow users to create changelog topic manually. But there are some strings attached. But we got the issues with naming conventions multiple times already. Maybe we need to do something about it. If you want to work on this, we would be more than happy. However, this change would require a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Let us know if you need any help preparing a KIP in case you want to pick it up. We can also discuss a little more on this JIRA. \cc [~miguno] [~guozhang] [~damianguy] [~enothereska] [~bbejeck] > [Kafka Streams] - custom name for state-store change-log topic > -- > > Key: KAFKA-5386 > URL: https://issues.apache.org/jira/browse/KAFKA-5386 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.1 >Reporter: Bart Vercammen > Labels: needs-kip > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _my.app.id-storename-changelog_ > {noformat}public static String storeChangelogTopic(String applicationId, > String storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > It would be nice if somehow I would be able to override this functionality > and provide the topic-name myself when creating the state-store. > Any comments? > Would it be OK to submit a PR for this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101659#comment-16101659 ] Matthias J. Sax commented on KAFKA-4327: It's Streams tool, so it belongs to package `o.a.k.streams.tools` -- we only put it into core because of the ZK dependency and we did not want to add ZK dependency to streams module. \cc [~ijuma] [~guozhang] [~ewencp] > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils
[ https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-3210. Resolution: Won't Fix We are following a slightly different approach, see KAFKA-5501. > Using asynchronous calls through the raw ZK API in ZkUtils > -- > > Key: KAFKA-3210 > URL: https://issues.apache.org/jira/browse/KAFKA-3210 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira > > We have observed a number of issues with the controller interaction with > ZooKeeper mainly because ZkClient creates new sessions transparently under > the hood. Creating sessions transparently enables, for example, old > controller to successfully update znodes in ZooKeeper even when they aren't > the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass > the ZkClient lib like we did with ZKWatchedEphemeral. > In addition to fixing such races with the controller, it would improve > performance significantly if we used the async API (see KAFKA-3038). The > async API is more efficient because it pipelines the requests to ZooKeeper, > and the number of requests upon controller recovery can be large. > This jira proposes to make these two changes to the calls in ZkUtils and to > do it, one path is to first replace the calls in ZkUtils with raw async ZK > calls and block so that we don't have to change the controller code in this > phase. Once this step is accomplished and it is stable, we make changes to > the controller to handle the asynchronous calls to ZooKeeper. > Note that in the first step, we will need to introduce some new logic for > session management, which is currently handled entirely by ZkClient. We will > also need to implement the subscription mechanism for event notifications > (see ZooKeeperLeaderElector as a an exemple). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5328) consider switching json parser from scala to jackson
[ https://issues.apache.org/jira/browse/KAFKA-5328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-5328. Resolution: Duplicate Duplicate of KAFKA-1595. > consider switching json parser from scala to jackson > > > Key: KAFKA-5328 > URL: https://issues.apache.org/jira/browse/KAFKA-5328 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The scala json parser is significantly slower than jackson. > This can have a nontrivial impact on controller initialization since the > controller loads and parses almost all zookeeper state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5642) Use async ZookeeperClient in Controller
[ https://issues.apache.org/jira/browse/KAFKA-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5642: --- Summary: Use async ZookeeperClient in Controller (was: use async ZookeeperClient everywhere) > Use async ZookeeperClient in Controller > --- > > Key: KAFKA-5642 > URL: https://issues.apache.org/jira/browse/KAFKA-5642 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Synchronous zookeeper writes means that we wait an entire round trip before > doing the next write. These synchronous writes are happening at a > per-partition granularity in several places, so partition-heavy clusters > suffer from the controller doing many sequential round trips to zookeeper. > * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in > zookeeper on transition to OnlinePartition. This gets triggered per-partition > sequentially with synchronous writes during controlled shutdown of the > shutting down broker's replicas for which it is the leader. > * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to > OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets > triggered per-partition sequentially with synchronous writes for failed or > controlled shutdown brokers. > KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined > requests to zookeeper. We should replace ZkClient's usage with this client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5645) Use async ZookeeperClient in SimpleAclAuthorizer
Ismael Juma created KAFKA-5645: -- Summary: Use async ZookeeperClient in SimpleAclAuthorizer Key: KAFKA-5645 URL: https://issues.apache.org/jira/browse/KAFKA-5645 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5646) Use async ZookeeperClient for Config and ISR management
Ismael Juma created KAFKA-5646: -- Summary: Use async ZookeeperClient for Config and ISR management Key: KAFKA-5646 URL: https://issues.apache.org/jira/browse/KAFKA-5646 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5647) Use async ZookeeperClient for Admin operations
Ismael Juma created KAFKA-5647: -- Summary: Use async ZookeeperClient for Admin operations Key: KAFKA-5647 URL: https://issues.apache.org/jira/browse/KAFKA-5647 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5642) Use async ZookeeperClient in Controller
[ https://issues.apache.org/jira/browse/KAFKA-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101713#comment-16101713 ] Ismael Juma commented on KAFKA-5642: [~onurkaraman], I took a stab at splitting this JIRA into multiple ones. I think it will be easier to review if we do it this way instead of a single PR. If you have other ideas on how we should do it, happy to update things. > Use async ZookeeperClient in Controller > --- > > Key: KAFKA-5642 > URL: https://issues.apache.org/jira/browse/KAFKA-5642 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Synchronous zookeeper writes means that we wait an entire round trip before > doing the next write. These synchronous writes are happening at a > per-partition granularity in several places, so partition-heavy clusters > suffer from the controller doing many sequential round trips to zookeeper. > * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in > zookeeper on transition to OnlinePartition. This gets triggered per-partition > sequentially with synchronous writes during controlled shutdown of the > shutting down broker's replicas for which it is the leader. > * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to > OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets > triggered per-partition sequentially with synchronous writes for failed or > controlled shutdown brokers. > KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined > requests to zookeeper. We should replace ZkClient's usage with this client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3038) Speeding up partition reassignment after broker failure
[ https://issues.apache.org/jira/browse/KAFKA-3038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101716#comment-16101716 ] ASF GitHub Bot commented on KAFKA-3038: --- Github user resetius closed the pull request at: https://github.com/apache/kafka/pull/2213 > Speeding up partition reassignment after broker failure > --- > > Key: KAFKA-3038 > URL: https://issues.apache.org/jira/browse/KAFKA-3038 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 0.9.0.0 >Reporter: Eno Thereska > > After a broker failure the controller does several writes to Zookeeper for > each partition on the failed broker. Writes are done one at a time, in closed > loop, which is slow especially under high latency networks. Zookeeper has > support for batching operations (the "multi" API). It is expected that > substituting serial writes with batched ones should reduce failure handling > time by an order of magnitude. > This is identified as an issue in > https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 > (section End-to-end latency during a broker failure) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101715#comment-16101715 ] ASF GitHub Bot commented on KAFKA-1595: --- Github user resetius closed the pull request at: https://github.com/apache/kafka/pull/2214 > Remove deprecated and slower scala JSON parser > -- > > Key: KAFKA-1595 > URL: https://issues.apache.org/jira/browse/KAFKA-1595 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Jagbir >Assignee: Ismael Juma > Labels: newbie > > The following issue is created as a follow up suggested by Jun Rao > in a kafka news group message with the Subject > "Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount" > SUMMARY: > An issue was detected in a typical cluster of 3 kafka instances backed > by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, > java version 1.7.0_65). On consumer end, when consumers get recycled, > there is a troubling JSON parsing recursion which takes a busy lock and > blocks consumers thread pool. > In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes > a global lock (0xd3a7e1d0) during the rebalance, and fires an > expensive JSON parsing, while keeping the other consumers from shutting > down, see, e.g, > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > The deep recursive JSON parsing should be deprecated in favor > of a better JSON parser, see, e.g, > http://engineering.ooyala.com/blog/comparing-scala-json-libraries? > DETAILS: > The first dump is for a recursive blocking thread holding the lock for > 0xd3a7e1d0 > and the subsequent dump is for a waiting thread. > (Please grep for 0xd3a7e1d0 to see the locked object.) > Â > -8<- > "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor" > prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] > java.lang.Thread.State: RUNNABLE > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$ano
[jira] [Created] (KAFKA-5648) make Merger extend Aggregator
Clemens Valiente created KAFKA-5648: --- Summary: make Merger extend Aggregator Key: KAFKA-5648 URL: https://issues.apache.org/jira/browse/KAFKA-5648 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.11.0.0 Reporter: Clemens Valiente Assignee: Clemens Valiente Priority: Minor Hi, I suggest that Merger should extend Aggregator. reason: Both classes usually do very similar things. A merger takes two sessions and combines them, an aggregator takes an existing session and aggregates new values into it. in some use cases it is actually the same thing, e.g.: -> .map() to > -> .groupByKey().aggregate() to > In this case both merger and aggregator do the same thing: take two lists and combine them into one. With the proposed change we could pass the Merger as both the merger and aggregator to the .aggregate() method and keep our business logic within one merger class. Or in other words: The Merger is simply an Aggregator that happens to aggregate two objects of the same class -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5649) Producer is being closed generating ssl exception
Pablo Panero created KAFKA-5649: --- Summary: Producer is being closed generating ssl exception Key: KAFKA-5649 URL: https://issues.apache.org/jira/browse/KAFKA-5649 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.10.2.1 Environment: Spark 2.2.0 and kafka 0.10.2.0 Reporter: Pablo Panero Priority: Minor On a streaming job using built-in kafka source and sink (over SSL), with I am getting the following exception: On a streaming job using built-in kafka source and sink (over SSL), with I am getting the following exception: Config of the source: {code:java} val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", config.bootstrapServers) .option("failOnDataLoss", value = false) .option("kafka.connections.max.idle.ms", 360) //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication. .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "GSSAPI") .option("kafka.sasl.kerberos.service.name", "kafka") .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") .option("kafka.ssl.truststore.password", "changeit") .option("subscribe", config.topicConfigList.keys.mkString(",")) .load() {code} Config of the sink: {code:java} .writeStream .option("checkpointLocation", s"${config.checkpointDir}/${topicConfig._1}/") .format("kafka") .option("kafka.bootstrap.servers", config.bootstrapServers) .option("kafka.connections.max.idle.ms", 360) //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication. .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "GSSAPI") .option("kafka.sasl.kerberos.service.name", "kafka") .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") .option("kafka.ssl.truststore.password", "changeit") .start() {code} {code:java} 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) at org.apache.kafka.common.network.Selector.close(Selector.java:531) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.
[jira] [Created] (KAFKA-5650) Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)
Damian Guy created KAFKA-5650: - Summary: Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182) Key: KAFKA-5650 URL: https://issues.apache.org/jira/browse/KAFKA-5650 Project: Kafka Issue Type: Bug Reporter: Damian Guy Assignee: Damian Guy As per KIP-182: A new interface will be added: {code} /** * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public interface StateStoreBuilder { StateStoreBuilder withCachingEnabled(); StateStoreBuilder withCachingDisabled(); StateStoreBuilder withLoggingEnabled(Map config); StateStoreBuilder withLoggingDisabled(); T build(); } {code} This interface will be used to wrap stores with caching, logging etc. Additionally some convenience methods on the {{Stores}} class: {code} public static StateStoreSupplier> persistentKeyValueStore(final String name, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier> inMemoryKeyValueStore(final String name, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier> lruMap(final String name, final int capacity, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier> persistentWindowStore(final String name, final Windows windows, final Serde keySerde, final Serde valueSerde) public static StateStoreSupplier> persistentSessionStore(final String name, final SessionWindows windows, final Serde keySerde, final Serde valueSerde) /** * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public StateStoreBuilder> windowStoreBuilder(final StateStoreSupplier> supplier) public StateStoreBuilder> keyValueStoreBuilder(final StateStoreSupplier> supplier) public StateStoreBuilder> sessionStoreBuilder(final StateStoreSupplier> supplier) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5649) Producer is being closed generating ssl exception
[ https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Panero updated KAFKA-5649: Description: On a streaming job using built-in kafka source and sink (over SSL), with I am getting the following exception: On a streaming job using built-in kafka source and sink (over SSL), with I am getting the following exception: Config of the source: {code:java} val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", config.bootstrapServers) .option("failOnDataLoss", value = false) .option("kafka.connections.max.idle.ms", 360) //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication. .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "GSSAPI") .option("kafka.sasl.kerberos.service.name", "kafka") .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") .option("kafka.ssl.truststore.password", "changeit") .option("subscribe", config.topicConfigList.keys.mkString(",")) .load() {code} Config of the sink: {code:java} .writeStream .option("checkpointLocation", s"${config.checkpointDir}/${topicConfig._1}/") .format("kafka") .option("kafka.bootstrap.servers", config.bootstrapServers) .option("kafka.connections.max.idle.ms", 360) //SSL: this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication. .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.mechanism", "GSSAPI") .option("kafka.sasl.kerberos.service.name", "kafka") .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") .option("kafka.ssl.truststore.password", "changeit") .start() {code} And in some cases it throws the exception making the spark job stuck in that step. Exception stack trace is the following: {code:java} 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) at org.apache.kafka.common.network.Selector.close(Selector.java:531) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.ap
[jira] [Created] (KAFKA-5651) KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines
Damian Guy created KAFKA-5651: - Summary: KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines Key: KAFKA-5651 URL: https://issues.apache.org/jira/browse/KAFKA-5651 Project: Kafka Issue Type: New Feature Reporter: Damian Guy Assignee: Damian Guy -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5650) Provide a simple way for custom storage engines to use streams wrapped stores (KIP-182)
[ https://issues.apache.org/jira/browse/KAFKA-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-5650: -- Issue Type: Sub-task (was: Bug) Parent: KAFKA-5651 > Provide a simple way for custom storage engines to use streams wrapped stores > (KIP-182) > --- > > Key: KAFKA-5650 > URL: https://issues.apache.org/jira/browse/KAFKA-5650 > Project: Kafka > Issue Type: Sub-task >Reporter: Damian Guy >Assignee: Damian Guy > > As per KIP-182: > A new interface will be added: > {code} > /** > * Implementations of this will provide the ability to wrap a given StateStore > * with or without caching/loggging etc. > */ > public interface StateStoreBuilder { > > StateStoreBuilder withCachingEnabled(); > StateStoreBuilder withCachingDisabled(); > StateStoreBuilder withLoggingEnabled(Map config); > StateStoreBuilder withLoggingDisabled(); > T build(); > } > {code} > This interface will be used to wrap stores with caching, logging etc. > Additionally some convenience methods on the {{Stores}} class: > {code} > public static StateStoreSupplier> > persistentKeyValueStore(final String name, > >final Serde keySerde, > >final Serde valueSerde) > > public static StateStoreSupplier> > inMemoryKeyValueStore(final String name, > > final Serde keySerde, > > final Serde valueSerde) > > public static StateStoreSupplier> lruMap(final > String name, > final int > capacity, > final > Serde keySerde, > final > Serde valueSerde) > > public static StateStoreSupplier> > persistentWindowStore(final String name, > > final Windows windows, > > final Serde keySerde, > > final Serde valueSerde) > > public static StateStoreSupplier> > persistentSessionStore(final String name, > > final SessionWindows windows, > > final Serde keySerde, > > final Serde valueSerde) > > /** > * The following methods are for use with the PAPI. They allow building of > StateStores that can be wrapped with > * caching, logging, and any other convenient wrappers provided by the > KafkaStreams library > */ > public StateStoreBuilder> windowStoreBuilder(final > StateStoreSupplier> supplier) > > public StateStoreBuilder> > keyValueStoreBuilder(final StateStoreSupplier> supplier) > > public StateStoreBuilder> sessionStoreBuilder(final > StateStoreSupplier> supplier) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5652) Add new api methods to KStream
Damian Guy created KAFKA-5652: - Summary: Add new api methods to KStream Key: KAFKA-5652 URL: https://issues.apache.org/jira/browse/KAFKA-5652 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Assignee: Damian Guy Add new methods from KIP-182 to {{KStream}} until finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5653) Add new API methods to KTable
Damian Guy created KAFKA-5653: - Summary: Add new API methods to KTable Key: KAFKA-5653 URL: https://issues.apache.org/jira/browse/KAFKA-5653 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5654) Add new API methods to KGroupedStream
Damian Guy created KAFKA-5654: - Summary: Add new API methods to KGroupedStream Key: KAFKA-5654 URL: https://issues.apache.org/jira/browse/KAFKA-5654 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Assignee: Damian Guy Placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5655) Add new API methods to KGroupedTable
Damian Guy created KAFKA-5655: - Summary: Add new API methods to KGroupedTable Key: KAFKA-5655 URL: https://issues.apache.org/jira/browse/KAFKA-5655 Project: Kafka Issue Type: Sub-task Reporter: Damian Guy Placeholder until API finalized -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5656) Support bulk attributes request on KafkaMbean where some attributes do not exist
Erik Kringen created KAFKA-5656: --- Summary: Support bulk attributes request on KafkaMbean where some attributes do not exist Key: KAFKA-5656 URL: https://issues.apache.org/jira/browse/KAFKA-5656 Project: Kafka Issue Type: Bug Components: clients Reporter: Erik Kringen Priority: Minor According to Oracle documentation on [Implementing a Dynamic MBean|http://docs.oracle.com/cd/E19698-01/816-7609/6mdjrf83d/index.html] bq. The bulk getter and setter methods usually rely on the generic getter and setter, respectively. This makes them independent of the management interface, which can simplify certain modifications. In this case, their implementation consists mostly of error checking on the list of attributes. However, all bulk getters and setters must be implemented so that an error on any one attribute does not interrupt or invalidate the bulk operation on the other attributes. bq. If an attribute cannot be read, then its name-value pair is not included in the list of results. If an attribute cannot be written, it will not be copied to the returned list of successful set operations. As a result, if there are any errors, the lists returned by bulk operators will not have the same length as the array or list passed to them. In any case, the bulk operators do not guarantee that their returned lists have the same ordering of attributes as the input array or list. The current implementation of {code}org.apache.kafka.common.metrics.JmxReporter.KafkaMbean#getAttributes{code} returns an empty list if any of the the requested attributes are not found. This method should instead log the exception but allow all requested attributes that are present to be returned, as prescribed via the DynamicMBean interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled
[ https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101932#comment-16101932 ] Jiangjie Qin commented on KAFKA-5621: - [~ijuma] [~apurva] The expiration for messages in the accumulator was not for memory footprint control, but for making progress when a partition is stuck. For example, if the leader of a partition becomes unavailable for some reason, the records in the accumulator cannot be sent. Retry only makes sense when the producer can try. So we have to expire the records at some point when that partition cannot make progress, whether it is expired after request_timeout or retries * request_timeout could be discussed. But notice that some times client will set the retries to be Integer.MAX_VALUE. This will also result in unexpected behavior. The reasons of having an explicit batch.expiry.ms are: 1) we have exposed the concept of bathing to the users through batch.size and linger.ms. So users should have already known the producer sends batches. No new concept is added. 2) If a record has been sitting in the record accumulator for more than batch.expiry.ms, likely there is a very long queue or the producer cannot make progress. So users may want to get an exception and do something. And this expiration time is kind of an SLO and is not necessarily related to the request_timeout * retries which is intended for the remote call. So decoupling them would be useful. > The producer should retry expired batches when retries are enabled > -- > > Key: KAFKA-5621 > URL: https://issues.apache.org/jira/browse/KAFKA-5621 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > Fix For: 1.0.0 > > > Today, when a batch is expired in the accumulator, a {{TimeoutException}} is > raised to the user. > It might be better the producer to retry the expired batch rather up to the > configured number of retries. This is more intuitive from the user's point of > view. > Further the proposed behavior makes it easier for applications like mirror > maker to provide ordering guarantees even when batches expire. Today, they > would resend the expired batch and it would get added to the back of the > queue, causing the output ordering to be different from the input ordering. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5656) Support bulk attributes request on KafkaMbean where some attributes do not exist
[ https://issues.apache.org/jira/browse/KAFKA-5656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101941#comment-16101941 ] ASF GitHub Bot commented on KAFKA-5656: --- GitHub user ErikKringen opened a pull request: https://github.com/apache/kafka/pull/3582 KAFKA-5656: Support bulk attributes request on KafkaMbean where some Support bulk attributes request on KafkaMbean where some attributes do not exist You can merge this pull request into a Git repository by running: $ git pull https://github.com/ErikKringen/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3582.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3582 commit 0febcdd59cee9e1f34bdd9646aee59944c28386e Author: Erik.Kringen Date: 2017-07-26T17:12:04Z KAFKA-5656: Support bulk attributes request on KafkaMbean where some attributes do not exist > Support bulk attributes request on KafkaMbean where some attributes do not > exist > > > Key: KAFKA-5656 > URL: https://issues.apache.org/jira/browse/KAFKA-5656 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Erik Kringen >Priority: Minor > > According to Oracle documentation on [Implementing a Dynamic > MBean|http://docs.oracle.com/cd/E19698-01/816-7609/6mdjrf83d/index.html] > bq. The bulk getter and setter methods usually rely on the generic getter and > setter, respectively. This makes them independent of the management > interface, which can simplify certain modifications. In this case, their > implementation consists mostly of error checking on the list of attributes. > However, all bulk getters and setters must be implemented so that an error on > any one attribute does not interrupt or invalidate the bulk operation on the > other attributes. > bq. If an attribute cannot be read, then its name-value pair is not included > in the list of results. If an attribute cannot be written, it will not be > copied to the returned list of successful set operations. As a result, if > there are any errors, the lists returned by bulk operators will not have the > same length as the array or list passed to them. In any case, the bulk > operators do not guarantee that their returned lists have the same ordering > of attributes as the input array or list. > The current implementation of > {code}org.apache.kafka.common.metrics.JmxReporter.KafkaMbean#getAttributes{code} > returns an empty list if any of the the requested attributes are not found. > This method should instead log the exception but allow all requested > attributes that are present to be returned, as prescribed via the > DynamicMBean interface. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101966#comment-16101966 ] Ewen Cheslack-Postava commented on KAFKA-4327: -- [~mjsax] We'd have to decide whether the java api for the command is considered public or just the sh script that executes it. If so we could move the tool but would want some deprecation period for the original w/ some logging about the deprecation. But otherwise I agree, the natural home for the tool is in streams. > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5341) Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics
[ https://issues.apache.org/jira/browse/KAFKA-5341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101975#comment-16101975 ] ASF GitHub Bot commented on KAFKA-5341: --- GitHub user lindong28 opened a pull request: https://github.com/apache/kafka/pull/3583 KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics (KIP-164) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lindong28/kafka KAFKA-5341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3583.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3583 commit 93f541c249def6bdc158cb79592278d18cdf3ff8 Author: Dong Lin Date: 2017-05-28T08:10:28Z KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics (KIP-164) > Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics > --- > > Key: KAFKA-5341 > URL: https://issues.apache.org/jira/browse/KAFKA-5341 > Project: Kafka > Issue Type: New Feature >Reporter: Dong Lin >Assignee: Dong Lin > > We currently have under replicated partitions, but we do not have a metric to > track the number of partitions whose in-sync replicas count < minIsr. > Partitions whose in-syn replicas count < minIsr will be unavailable to those > producers who uses ack = all. It is important for Kafka operators to be > notified of the existence of such partition because their existence reduces > the availability of the Kafka service. > More specifically, we can define a per-broker metric > UnderMinIsrPartitionCount as "The number of partitions that this broker leads > for which in-sync replicas count < minIsr." So if the RF was 3, and min ISR > is 2, then when there are 2 replicas in ISR this partition would be in the > under replicated partitions count. When there is 1 replica in ISR, this > partition would also be in the UnderMinIsrPartitionCount. > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics > for more detail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled
[ https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102096#comment-16102096 ] Apurva Mehta commented on KAFKA-5621: - > The expiration for messages in the accumulator was not for memory footprint > control, but for making progress when a partition is stuck This is a fair point. Since we have one queue per partition, there is no real reason why a single stuck partition should affect the others. I think what happens today is that the next {{producer.send}} to the stuck partition will just block because there is no space remaining, and that affects everything else. From this point of view, expiring accumulator batches does make sense. Is that right? On your second point of the concept of a batch being exposed, you are right, of course. Tuning your producer batches is an important part of getting the best performance out of kafka. We have the {{linger.ms}} and {{batch.size}} configs to control this and these are fairly intuitive to use. As a user, you would think "If I have an expected X byte/s steady state throughput and I want batches of Y bytes, I should set linger to Zms to make sure I get my optimal batching". In this context, exposing the notion of a client side record accumulator with batch expiry _is_ a new concept in my opinion. > The producer should retry expired batches when retries are enabled > -- > > Key: KAFKA-5621 > URL: https://issues.apache.org/jira/browse/KAFKA-5621 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > Fix For: 1.0.0 > > > Today, when a batch is expired in the accumulator, a {{TimeoutException}} is > raised to the user. > It might be better the producer to retry the expired batch rather up to the > configured number of retries. This is more intuitive from the user's point of > view. > Further the proposed behavior makes it easier for applications like mirror > maker to provide ordering guarantees even when batches expire. Today, they > would resend the expired batch and it would get added to the back of the > queue, causing the output ordering to be different from the input ordering. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5648) make Merger extend Aggregator
[ https://issues.apache.org/jira/browse/KAFKA-5648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102119#comment-16102119 ] Matthias J. Sax commented on KAFKA-5648: Your observation is correct, that {{Merger}} and {{Aggregator}} are similar. You also stated correctly, that the types are different though, as the {{Merger}} merges two aggregates of same type, while the Aggregator in general merged a single value (of type-A) merges the value into an aggregate (of type-B). Thus, {{Merger extends Aggregator make Merger extend Aggregator > - > > Key: KAFKA-5648 > URL: https://issues.apache.org/jira/browse/KAFKA-5648 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Clemens Valiente >Assignee: Clemens Valiente >Priority: Minor > > Hi, > I suggest that Merger should extend Aggregator. > reason: > Both classes usually do very similar things. A merger takes two sessions and > combines them, an aggregator takes an existing session and aggregates new > values into it. > in some use cases it is actually the same thing, e.g.: > -> .map() to > -> > .groupByKey().aggregate() to > > In this case both merger and aggregator do the same thing: take two lists and > combine them into one. > With the proposed change we could pass the Merger as both the merger and > aggregator to the .aggregate() method and keep our business logic within one > merger class. > Or in other words: The Merger is simply an Aggregator that happens to > aggregate two objects of the same class -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102134#comment-16102134 ] Matthias J. Sax commented on KAFKA-4327: It is definitely part of public API, and I am fine with keeping the current one as deprecated. On the other hand, it has annotation {{@InterfaceStability.Unstable}} and thus we can remove it without further deprecation, too. I cannot judge if people use the class from Java code or only use from command line. So I am fine either way (direct move or deprecating). > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102147#comment-16102147 ] Ismael Juma commented on KAFKA-4327: We discussed this at the time and we documented that it's _not_ part of public API so that we could move it to the right location: {code} This class is not part of public API. For backward compatibility, use the provided script in "bin/" instead of calling this class directly from your code. {code} > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5657) Connect REST API should include the connector type when describing a connector
Randall Hauch created KAFKA-5657: Summary: Connect REST API should include the connector type when describing a connector Key: KAFKA-5657 URL: https://issues.apache.org/jira/browse/KAFKA-5657 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 0.11.0.0 Reporter: Randall Hauch Fix For: 1.0.0 Kafka Connect's REST API's {{connectors/}} and {{connectors/{name}}} endpoints should include whether the connector is a source or a sink. See KAFKA-4343 and KIP-151 for the related modification of the {{connector-plugins}} endpoint. Also see KAFKA-4279 for converter-related endpoints. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4279) REST endpoint to list converter plugins
[ https://issues.apache.org/jira/browse/KAFKA-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-4279: - Labels: needs-kip newbie (was: ) > REST endpoint to list converter plugins > --- > > Key: KAFKA-4279 > URL: https://issues.apache.org/jira/browse/KAFKA-4279 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gwen Shapira >Priority: Minor > Labels: needs-kip, newbie > > We have a REST resource that allows users to see the available plugins, but > we have no equivalent that allows listing available converters. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5657) Connect REST API should include the connector type when describing a connector
[ https://issues.apache.org/jira/browse/KAFKA-5657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-5657: - Description: Kafka Connect's REST API's {{connectors/}} and {{connectors/\{name\}}} endpoints should include whether the connector is a source or a sink. See KAFKA-4343 and KIP-151 for the related modification of the {{connector-plugins}} endpoint. Also see KAFKA-4279 for converter-related endpoints. was: Kafka Connect's REST API's {{connectors/}} and {{connectors/{name}}} endpoints should include whether the connector is a source or a sink. See KAFKA-4343 and KIP-151 for the related modification of the {{connector-plugins}} endpoint. Also see KAFKA-4279 for converter-related endpoints. > Connect REST API should include the connector type when describing a connector > -- > > Key: KAFKA-5657 > URL: https://issues.apache.org/jira/browse/KAFKA-5657 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Randall Hauch > Labels: needs-kip, newbie > Fix For: 1.0.0 > > > Kafka Connect's REST API's {{connectors/}} and {{connectors/\{name\}}} > endpoints should include whether the connector is a source or a sink. > See KAFKA-4343 and KIP-151 for the related modification of the > {{connector-plugins}} endpoint. > Also see KAFKA-4279 for converter-related endpoints. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled
[ https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102176#comment-16102176 ] Jiangjie Qin commented on KAFKA-5621: - [~apurva] Yes, I agree that expiry ms is a new concept as it is an additional thing users may want to think, i.e. "If I have a partition unavailable temporarily, how long am I willing to wait for it to come back?" Arguably this can also be derived from request timeout and retries. But the difference here is that those two configs are primarily for other cases, and in practice we found it is quite tricky (if possible) to get them right for the batch expiration. > The producer should retry expired batches when retries are enabled > -- > > Key: KAFKA-5621 > URL: https://issues.apache.org/jira/browse/KAFKA-5621 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > Fix For: 1.0.0 > > > Today, when a batch is expired in the accumulator, a {{TimeoutException}} is > raised to the user. > It might be better the producer to retry the expired batch rather up to the > configured number of retries. This is more intuitive from the user's point of > view. > Further the proposed behavior makes it easier for applications like mirror > maker to provide ordering guarantees even when batches expire. Today, they > would resend the expired batch and it would get added to the back of the > queue, causing the output ordering to be different from the input ordering. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change
[ https://issues.apache.org/jira/browse/KAFKA-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102200#comment-16102200 ] Per Steffensen commented on KAFKA-5505: --- bq. There's been some discussion about more incremental rebalancing, but as you add/remove tasks, there's no way to avoid the fact that to keep the work balanced we may need to stop/start/move some tasks I can handle restart of tasks! It is just a significant overhead if it happens all the time - and it does, as long a all tasks are restarted every time the set of tasks changes. It will not be a problem if it happens "from time to time" due to rebalance. What annoys me most is actually that the connector itself is restarted, when the set of tasks changes - there is no good reason for that at all, as I see it? The problem is that it takes some time before my connector can builds up its set of tasks, after is (re)starts, because it has to talk with other components to get the entire set of tasks. But the connector has to give a set of tasks almost immediately after (re)start, or things will start behaving strange. Therefore my connector has to start out saying that its set of tasks is empty, and then change the set of tasks (calling context.requestTaskReconfiguration) along the way, as it knows about more and more tasks. But when it does so, the connector is restarted itself, and starts over with an empty set of tasks. I makes the process go: connector started -> empty set of tasks -> some tasks -> connector restarted -> empty set of tasks -> some tasks -> connector restarted -> ... I really have to hack to make it work. If we could just make a change where the connector is not restarted, when it changes its set of tasks, it will be a big step. bq. Can you explain why you have task sets changing so frequently? Ohhh, it is a fairly long explanation in my case. But in general I do not have a hard time imagining connectors with a changing set of tasks. I believe you already have a source-connector out-of-the-box that can copy from a relational database table. Imagine that you would like to extend it, to be able to copy all tables of that database, running one task per table. Guess that would be a fairly reasonable extension. If the set of tables change often, the set of tasks of this connector would change often. bq. It's possible that a different way of assigning partitions to tasks might avoid rebalancing all the time. Well I did that for now. Actually I changed it so that I always have exactly one task, and inside that single task, I handle all the stuff that would otherwise be distributed between tasks. My single task, runs one thread per "partition in the source" - basically one thread where I would like to have had one task. It works the same, but it will not scale, because one task has to run on one machine. Being able to split into several tasks, will help scale. One machine will definitely be able to handle one "partition in the source", but it may not be able to handle "all partitions in the source". I could also take this principle and scale to another fixed number (N) of tasks, higher than one. Then task no M (M from 0 to N-1) will handle "partitions in the source" P where hash(id-of-P) modulo N is M. So I have ways around the problem, but I think the requested change would be nice in general, and something people will expect to be available, especially since it is possible to change the set of tasks along the way - I know I was surprised that it did not already work as I request here. > Connect: Do not restart connector and existing tasks on task-set change > --- > > Key: KAFKA-5505 > URL: https://issues.apache.org/jira/browse/KAFKA-5505 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.1 >Reporter: Per Steffensen > > I am writing a connector with a frequently changing task-set. It is really > not working very well, because the connector and all existing tasks are > restarted when the set of tasks changes. E.g. if the connector is running > with 10 tasks, and an additional task is needed, the connector itself and all > 10 existing tasks are restarted, just to make the 11th task run also. My > tasks have a fairly heavy initialization, making it extra annoying. I would > like to see a change, introducing a "mode", where only new/deleted tasks are > started/stopped when notifying the system that the set of tasks changed > (calling context.requestTaskReconfiguration() - or something similar). > Discussed this issue a little on d...@kafka.apache.org in the thread "Kafka > Connect: To much restarting with a SourceConnector with dynamic set of tasks" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5658) adminclient will stop working after some amount of time
dan norwood created KAFKA-5658: -- Summary: adminclient will stop working after some amount of time Key: KAFKA-5658 URL: https://issues.apache.org/jira/browse/KAFKA-5658 Project: Kafka Issue Type: Bug Reporter: dan norwood if i create an admin client and let it sit unused for some amount of time, then attempt to use it i will get the following {noformat} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.BrokerNotAvailableException {noformat} even though the broker is up. if before each usage of adminclient i create a new admin client i do not see the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5658: --- Labels: reliability (was: ) > adminclient will stop working after some amount of time > --- > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > if i create an admin client and let it sit unused for some amount of time, > then attempt to use it i will get the following > {noformat} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException > {noformat} > even though the broker is up. if before each usage of adminclient i create a > new admin client i do not see the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5658: --- Priority: Critical (was: Major) > adminclient will stop working after some amount of time > --- > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > if i create an admin client and let it sit unused for some amount of time, > then attempt to use it i will get the following > {noformat} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException > {noformat} > even though the broker is up. if before each usage of adminclient i create a > new admin client i do not see the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5658) adminclient will stop working after some amount of time
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5658: --- Fix Version/s: 0.11.0.1 > adminclient will stop working after some amount of time > --- > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood > Labels: reliability > Fix For: 0.11.0.1 > > > if i create an admin client and let it sit unused for some amount of time, > then attempt to use it i will get the following > {noformat} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException > {noformat} > even though the broker is up. if before each usage of adminclient i create a > new admin client i do not see the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5658) adminclient will stop working after some amount of time
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-5658: -- Assignee: Colin P. McCabe > adminclient will stop working after some amount of time > --- > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood >Assignee: Colin P. McCabe >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > if i create an admin client and let it sit unused for some amount of time, > then attempt to use it i will get the following > {noformat} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException > {noformat} > even though the broker is up. if before each usage of adminclient i create a > new admin client i do not see the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe updated KAFKA-5658: --- Summary: Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions (was: adminclient will stop working after some amount of time) > Fix AdminClient request timeout handling bug resulting in continual > BrokerNotAvailableExceptions > > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood >Assignee: Colin P. McCabe >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > if i create an admin client and let it sit unused for some amount of time, > then attempt to use it i will get the following > {noformat} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException > {noformat} > even though the broker is up. if before each usage of adminclient i create a > new admin client i do not see the same behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe updated KAFKA-5658: --- Description: The AdminClient does not properly clear calls from the callsInFlight structure. Later, in an effort to clear the lingering call objects, it closes the connection they are associated with. This disrupts new incoming calls, which then get {{java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.BrokerNotAvailableException}}. (was: if i create an admin client and let it sit unused for some amount of time, then attempt to use it i will get the following {noformat} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.BrokerNotAvailableException {noformat} even though the broker is up. if before each usage of adminclient i create a new admin client i do not see the same behavior.) > Fix AdminClient request timeout handling bug resulting in continual > BrokerNotAvailableExceptions > > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood >Assignee: Colin P. McCabe >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > The AdminClient does not properly clear calls from the callsInFlight > structure. Later, in an effort to clear the lingering call objects, it > closes the connection they are associated with. This disrupts new incoming > calls, which then get {{java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5658) Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions
[ https://issues.apache.org/jira/browse/KAFKA-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102290#comment-16102290 ] ASF GitHub Bot commented on KAFKA-5658: --- GitHub user cmccabe opened a pull request: https://github.com/apache/kafka/pull/3584 KAFKA-5658. Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions The AdminClient does not properly clear calls from the callsInFlight structure. Later, in an effort to clear the lingering call objects, it closes the connection they are associated with. This disrupts new incoming calls, which then get BrokerNotAvailableException. This patch fixes this bug by properly removing completed calls from the callsInFlight structure. It also adds the Call#aborted flag, which ensures that we only abort a connection once-- even if there is a similar bug in the future which causes old Call objects to linger. This patch also fixes a case where AdminClient#describeConfigs was making an extra RPC that had no useful effect. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cmccabe/kafka KAFKA-5658 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3584.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3584 commit 7eedf51d2b29565460f04f78435f2bdf5a9cd661 Author: Colin P. Mccabe Date: 2017-07-17T17:04:58Z KAFKA-5602: ducker-ak: support --custom-ducktape Support a --custom-ducktape flag which allows developers to install their own versions of ducktape into Docker images. This is helpful for ducktape development. commit 811983f02cb1ff887bbe75ffc22ef51f98a99a36 Author: Colin P. Mccabe Date: 2017-07-26T20:57:18Z KAFKA-5658. Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions > Fix AdminClient request timeout handling bug resulting in continual > BrokerNotAvailableExceptions > > > Key: KAFKA-5658 > URL: https://issues.apache.org/jira/browse/KAFKA-5658 > Project: Kafka > Issue Type: Bug >Reporter: dan norwood >Assignee: Colin P. McCabe >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > The AdminClient does not properly clear calls from the callsInFlight > structure. Later, in an effort to clear the lingering call objects, it > closes the connection they are associated with. This disrupts new incoming > calls, which then get {{java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.BrokerNotAvailableException}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5659) AdminClient#describeConfigs makes an extra empty request when only broker info is requested
Colin P. McCabe created KAFKA-5659: -- Summary: AdminClient#describeConfigs makes an extra empty request when only broker info is requested Key: KAFKA-5659 URL: https://issues.apache.org/jira/browse/KAFKA-5659 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0 Reporter: Colin P. McCabe Assignee: Colin P. McCabe AdminClient#describeConfigs makes an extra empty request when only broker info is requested -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5659) AdminClient#describeConfigs makes an extra empty request when only broker info is requested
[ https://issues.apache.org/jira/browse/KAFKA-5659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102309#comment-16102309 ] ASF GitHub Bot commented on KAFKA-5659: --- GitHub user cmccabe opened a pull request: https://github.com/apache/kafka/pull/3585 KAFKA-5659. AdminClient#describeConfigs makes an extra empty request … …when only broker info is requested You can merge this pull request into a Git repository by running: $ git pull https://github.com/cmccabe/kafka KAFKA-5659 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3585.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3585 commit 593ed34ce08c19a34f2146999f9e07a02927ca61 Author: Colin P. Mccabe Date: 2017-07-26T21:20:12Z KAFKA-5659. AdminClient#describeConfigs makes an extra empty request when only broker info is requested > AdminClient#describeConfigs makes an extra empty request when only broker > info is requested > --- > > Key: KAFKA-5659 > URL: https://issues.apache.org/jira/browse/KAFKA-5659 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe > > AdminClient#describeConfigs makes an extra empty request when only broker > info is requested -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
Matthias J. Sax created KAFKA-5660: -- Summary: Don't throw TopologyBuilderException during runtime Key: KAFKA-5660 URL: https://issues.apache.org/jira/browse/KAFKA-5660 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.0 Reporter: Matthias J. Sax {{TopologyBuilderException}} is a pre-runtime exception that should only be thrown {{KafkaStreams#start()}} is called. However, we do throw {{TopologyBuilderException}} within - `SourceNodeFactory#getTopics` - `ProcessorContextImpl#getStateStore` (and maybe somewhere else: we should double check if there are other places in the code like those). We should replace those exception with either {{StreamsException}} or with a new exception type. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled
[ https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102313#comment-16102313 ] Apurva Mehta commented on KAFKA-5621: - I think the core dichotomy is that we have mirror-maker-like use cases and application use cases. In the mirror maker use case, each partition is truly independent. If a subset of partitions are down, we still want to process the rest. So we want to expire batches and raise errors to the application (mirror maker in this case) as soon as possible. On the other hand, for an application, partitions are not really independent (and especially so if you use transactions). If one partition is down, it makes sense to wait for it to be ready before continuing. So we would want to handle as many errors internally as possible. It would mean blocking sends once the queue is too large and not expiring batches in the queue. This simplifies the application programming model. I think we should optimize the defaults for applications, but yet enable tools like mirror maker to get the desired behavior by setting the right configs. Assuming that the we complete [KAFKA-5494], we could apply retries to expired batches only when the idempotent producer is enabled. This way the default behavior is the simplest one for the application. KMM and other such tools could continue to use the producer without idempotence enabled and keep the existing behavior. Of course, if we get into the same quandary if KMM wants to enable idempotence, but this is the best compromise without introducing an additional config. Another option is to introduce the 'queue.time.ms' config. The default would be infinite. When it is specified, we would not retry expired batches regardless of whether idempotence is enabled. So KMM like tooling could specify a value and most application developers could ignore it. I am not a fan of introducing new configs for a very narrow use case though, so I will continue to think of more alternatives. > The producer should retry expired batches when retries are enabled > -- > > Key: KAFKA-5621 > URL: https://issues.apache.org/jira/browse/KAFKA-5621 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > Fix For: 1.0.0 > > > Today, when a batch is expired in the accumulator, a {{TimeoutException}} is > raised to the user. > It might be better the producer to retry the expired batch rather up to the > configured number of retries. This is more intuitive from the user's point of > view. > Further the proposed behavior makes it easier for applications like mirror > maker to provide ordering guarantees even when batches expire. Today, they > would resend the expired batch and it would get added to the back of the > queue, causing the output ordering to be different from the input ordering. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5661) Develop and understanding of how to tune transactions for optimal performance
Apurva Mehta created KAFKA-5661: --- Summary: Develop and understanding of how to tune transactions for optimal performance Key: KAFKA-5661 URL: https://issues.apache.org/jira/browse/KAFKA-5661 Project: Kafka Issue Type: Sub-task Affects Versions: 0.11.0.0 Reporter: Apurva Mehta Assignee: Apurva Mehta Fix For: 1.0.0 Currently, we don't have an idea of the throughput curve for transactions across a different range of workloads. Thus we would like to understand how to tune transactions so that they are viable across a broad range of work loads. For instance, what knobs can you tweak if you use small messages to yet get acceptable transactional performance? We don't understand the performance curve across variables like message size, batch size, transaction duration, linger.ms, etc., and it would be good to get an understanding of this area and publish recommended configurations for different workloads. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5662) We should be able to specify min.insync.replicas for the __consumer_offsets topic
[ https://issues.apache.org/jira/browse/KAFKA-5662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-5662: --- Labels: needs-kip (was: ) > We should be able to specify min.insync.replicas for the __consumer_offsets > topic > - > > Key: KAFKA-5662 > URL: https://issues.apache.org/jira/browse/KAFKA-5662 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta > Labels: needs-kip > > The transaction log has a {{transaction.state.log.min.isr}} setting to > control the min.isr for the transaction log (by default the min.isr is 2 and > replication.factor is 3). > Unfortunately, we don't have a similar setting for the offsets topic. We > should add the following {{offsets.topic.min.isr}} setting and default that > to 2 so that we have durability on the offsets topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5662) We should be able to specify min.insync.replicas for the __consumer_offsets topic
Apurva Mehta created KAFKA-5662: --- Summary: We should be able to specify min.insync.replicas for the __consumer_offsets topic Key: KAFKA-5662 URL: https://issues.apache.org/jira/browse/KAFKA-5662 Project: Kafka Issue Type: Bug Reporter: Apurva Mehta The transaction log has a {{transaction.state.log.min.isr}} setting to control the min.isr for the transaction log (by default the min.isr is 2 and replication.factor is 3). Unfortunately, we don't have a similar setting for the offsets topic. We should add the following {{offsets.topic.min.isr}} setting and default that to 2 so that we have durability on the offsets topic. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102357#comment-16102357 ] Matthias J. Sax commented on KAFKA-4327: Thanks [~ijuma]! This clarifies it. > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5663) LogDirFailureTest system test fails
Apurva Mehta created KAFKA-5663: --- Summary: LogDirFailureTest system test fails Key: KAFKA-5663 URL: https://issues.apache.org/jira/browse/KAFKA-5663 Project: Kafka Issue Type: Bug Reporter: Apurva Mehta Assignee: Dong Lin The recently added JBOD system test failed last night. {noformat} Producer failed to produce messages for 20s. Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 123, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", line 176, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", line 321, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py", line 166, in test_replication_with_disk_failure self.start_producer_and_consumer() File "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 75, in start_producer_and_consumer self.producer_start_timeout_sec) File "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", line 36, in wait_until raise TimeoutError(err_msg) TimeoutError: Producer failed to produce messages for 20s. {noformat} Complete logs here: http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5663) LogDirFailureTest system test fails
[ https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102370#comment-16102370 ] Apurva Mehta commented on KAFKA-5663: - [~lindong] I assigned this to you since you added this test recently. Can you please take a look? > LogDirFailureTest system test fails > --- > > Key: KAFKA-5663 > URL: https://issues.apache.org/jira/browse/KAFKA-5663 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta >Assignee: Dong Lin > > The recently added JBOD system test failed last night. > {noformat} > Producer failed to produce messages for 20s. > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", > line 321, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py", > line 166, in test_replication_with_disk_failure > self.start_producer_and_consumer() > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 75, in start_producer_and_consumer > self.producer_start_timeout_sec) > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Producer failed to produce messages for 20s. > {noformat} > Complete logs here: > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102394#comment-16102394 ] Guozhang Wang commented on KAFKA-4327: -- +1 to what [~ijuma] said. > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5664) Disable auto offset commit in ConsoleConsumer if no group is provided
Jason Gustafson created KAFKA-5664: -- Summary: Disable auto offset commit in ConsoleConsumer if no group is provided Key: KAFKA-5664 URL: https://issues.apache.org/jira/browse/KAFKA-5664 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson In ConsoleCosnumer, if no group is provided, we generate a random groupId: {code} consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(10)}") {code} In this case, since the group is not likely to be used again, we should disable automatic offset commits. This avoids polluting the coordinator cache with offsets that will never be used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5663) LogDirFailureTest system test fails
[ https://issues.apache.org/jira/browse/KAFKA-5663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102403#comment-16102403 ] Dong Lin commented on KAFKA-5663: - Thanks [~apurva]. I will look into this. > LogDirFailureTest system test fails > --- > > Key: KAFKA-5663 > URL: https://issues.apache.org/jira/browse/KAFKA-5663 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta >Assignee: Dong Lin > > The recently added JBOD system test failed last night. > {noformat} > Producer failed to produce messages for 20s. > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 123, in run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py", > line 176, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py", > line 321, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/core/log_dir_failure_test.py", > line 166, in test_replication_with_disk_failure > self.start_producer_and_consumer() > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 75, in start_producer_and_consumer > self.producer_start_timeout_sec) > File > "/home/jenkins/workspace/system-test-kafka-trunk/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py", > line 36, in wait_until > raise TimeoutError(err_msg) > TimeoutError: Producer failed to produce messages for 20s. > {noformat} > Complete logs here: > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-07-26--001.1501074756--apache--trunk--91c207c/LogDirFailureTest/test_replication_with_disk_failure/bounce_broker=False.security_protocol=PLAINTEXT.broker_type=follower/48.tgz -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5664) Disable auto offset commit in ConsoleConsumer if no group is provided
[ https://issues.apache.org/jira/browse/KAFKA-5664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian reassigned KAFKA-5664: -- Assignee: Vahid Hashemian > Disable auto offset commit in ConsoleConsumer if no group is provided > - > > Key: KAFKA-5664 > URL: https://issues.apache.org/jira/browse/KAFKA-5664 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Vahid Hashemian > > In ConsoleCosnumer, if no group is provided, we generate a random groupId: > {code} > consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new > Random().nextInt(10)}") > {code} > In this case, since the group is not likely to be used again, we should > disable automatic offset commits. This avoids polluting the coordinator cache > with offsets that will never be used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5007) Kafka Replica Fetcher Thread- Resource Leak
[ https://issues.apache.org/jira/browse/KAFKA-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102622#comment-16102622 ] huxihx commented on KAFKA-5007: --- [~joseph.alias...@gmail.com] what's the status for this jira? Have you confirmed that it's the reason? If yes, I could work on it to fix. > Kafka Replica Fetcher Thread- Resource Leak > --- > > Key: KAFKA-5007 > URL: https://issues.apache.org/jira/browse/KAFKA-5007 > Project: Kafka > Issue Type: Bug > Components: core, network >Affects Versions: 0.10.0.0, 0.10.1.1, 0.10.2.0 > Environment: Centos 7 > Jave 8 >Reporter: Joseph Aliase >Priority: Critical > Labels: reliability > Attachments: jstack-kafka.out, jstack-zoo.out, lsofkafka.txt, > lsofzookeeper.txt > > > Kafka is running out of open file descriptor when system network interface is > done. > Issue description: > We have a Kafka Cluster of 5 node running on version 0.10.1.1. The open file > descriptor for the account running Kafka is set to 10. > During an upgrade, network interface went down. Outage continued for 12 hours > eventually all the broker crashed with java.io.IOException: Too many open > files error. > We repeated the test in a lower environment and observed that Open Socket > count keeps on increasing while the NIC is down. > We have around 13 topics with max partition size of 120 and number of replica > fetcher thread is set to 8. > Using an internal monitoring tool we observed that Open Socket descriptor > for the broker pid continued to increase although NIC was down leading to > Open File descriptor error. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread
huxihx created KAFKA-5665: - Summary: Incorrect interruption invoking method used for Heartbeat thread Key: KAFKA-5665 URL: https://issues.apache.org/jira/browse/KAFKA-5665 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.11.0.0 Reporter: huxihx Assignee: huxihx Priority: Minor When interrupting the background heartbeat thread, `Thread.interrupted();` is used. Actually, `Thread.currentThread().interrupt();` should be used to restore the interruption status. An alternative way to solve is to remove `Thread.interrupted();` since HeartbeatThread extends Thread and all code higher up on the call stack is controlled, so we could safely swallow this exception. Anyway, `Thread.interrupted();` should not be used here. It's a test method not an action. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5665) Incorrect interruption invoking method used for Heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-5665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102661#comment-16102661 ] ASF GitHub Bot commented on KAFKA-5665: --- GitHub user huxihx opened a pull request: https://github.com/apache/kafka/pull/3586 KAFKA-5665: Heartbeat thread should use correct interruption method to restore status When interrupting the background heartbeat thread, `Thread.interrupted();` is used. Actually, `Thread.currentThread().interrupt();` should be used to restore the interruption status. An alternative way to solve is to remove `Thread.interrupted();` since HeartbeatThread extends Thread and all code higher up on the call stack is controlled, so we could safely swallow this exception. Anyway, `Thread.interrupted();` should not be used here. It's a test method not an action. You can merge this pull request into a Git repository by running: $ git pull https://github.com/huxihx/kafka KAFKA-5665 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3586 commit 36d489eede2229db92eda077ae4baff80044fb25 Author: huxihx Date: 2017-07-27T03:53:21Z KAFKA-5665: Incorrect interruption invoking method used for Heartbeat thread When interrupting the background heartbeat thread, `Thread.interrupted();` is used. Actually, `Thread.currentThread().interrupt();` should be used to restore the interruption status. An alternative way to solve is to remove `Thread.interrupted();` since HeartbeatThread extends Thread and all code higher up on the call stack is controlled, so we could safely swallow this exception. Anyway, `Thread.interrupted();` should not be used here. It's a test method not an action. > Incorrect interruption invoking method used for Heartbeat thread > - > > Key: KAFKA-5665 > URL: https://issues.apache.org/jira/browse/KAFKA-5665 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.11.0.0 >Reporter: huxihx >Assignee: huxihx >Priority: Minor > > When interrupting the background heartbeat thread, `Thread.interrupted();` is > used. Actually, `Thread.currentThread().interrupt();` should be used to > restore the interruption status. An alternative way to solve is to remove > `Thread.interrupted();` since HeartbeatThread extends Thread and all code > higher up on the call stack is controlled, so we could safely swallow this > exception. Anyway, `Thread.interrupted();` should not be used here. It's a > test method not an action. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5341) Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics
[ https://issues.apache.org/jira/browse/KAFKA-5341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-5341: - Fix Version/s: 1.0.0 > Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics > --- > > Key: KAFKA-5341 > URL: https://issues.apache.org/jira/browse/KAFKA-5341 > Project: Kafka > Issue Type: New Feature >Reporter: Dong Lin >Assignee: Dong Lin > Fix For: 1.0.0 > > > We currently have under replicated partitions, but we do not have a metric to > track the number of partitions whose in-sync replicas count < minIsr. > Partitions whose in-syn replicas count < minIsr will be unavailable to those > producers who uses ack = all. It is important for Kafka operators to be > notified of the existence of such partition because their existence reduces > the availability of the Kafka service. > More specifically, we can define a per-broker metric > UnderMinIsrPartitionCount as "The number of partitions that this broker leads > for which in-sync replicas count < minIsr." So if the RF was 3, and min ISR > is 2, then when there are 2 replicas in ISR this partition would be in the > under replicated partitions count. When there is 1 replica in ISR, this > partition would also be in the UnderMinIsrPartitionCount. > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics > for more detail. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102758#comment-16102758 ] ASF GitHub Bot commented on KAFKA-5611: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3571 > One or more consumers in a consumer-group stop consuming after rebalancing > -- > > Key: KAFKA-5611 > URL: https://issues.apache.org/jira/browse/KAFKA-5611 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Panos Skianis >Assignee: Jason Gustafson > Labels: reliability > Fix For: 0.11.0.1 > > Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, > Server 2, Server 3 > > > Scenario: > - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on > (other apps need it but the one mentioned below is already on kafka 0.10.2.0 > client). > - 3 servers running 1 consumer each under the same consumer groupId. > - Servers seem to be consuming messages happily but then there is a timeout > to an external service that causes our app to restart the Kafka Consumer on > one of the servers (this is by design). That causes rebalancing of the group > and upon restart of one of the Consumers seem to "block". > - Server 3 is where the problems occur. > - Problem fixes itself either by restarting one of the 3 servers or cause > the group to rebalance again by using the console consumer with the > autocommit set to false and using the same group. > > Note: > - Haven't managed to recreate it at will yet. > - Mainly happens in production environment, often enough. Hence I do not > have any logs with DEBUG/TRACE statements yet. > - Extracts from log of each app server are attached. Also the log of the > kafka that seems to be dealing with the related group and generations. > - See COMMENT lines in the files for further info. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102759#comment-16102759 ] Jason Gustafson commented on KAFKA-5611: [~pskianis] We have merged the patch above. We would appreciate if you could confirm whether or not it fixes the issue. > One or more consumers in a consumer-group stop consuming after rebalancing > -- > > Key: KAFKA-5611 > URL: https://issues.apache.org/jira/browse/KAFKA-5611 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Panos Skianis >Assignee: Jason Gustafson > Labels: reliability > Fix For: 0.11.0.1 > > Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, > Server 2, Server 3 > > > Scenario: > - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on > (other apps need it but the one mentioned below is already on kafka 0.10.2.0 > client). > - 3 servers running 1 consumer each under the same consumer groupId. > - Servers seem to be consuming messages happily but then there is a timeout > to an external service that causes our app to restart the Kafka Consumer on > one of the servers (this is by design). That causes rebalancing of the group > and upon restart of one of the Consumers seem to "block". > - Server 3 is where the problems occur. > - Problem fixes itself either by restarting one of the 3 servers or cause > the group to rebalance again by using the console consumer with the > autocommit set to false and using the same group. > > Note: > - Haven't managed to recreate it at will yet. > - Mainly happens in production environment, often enough. Hence I do not > have any logs with DEBUG/TRACE statements yet. > - Extracts from log of each app server are attached. Also the log of the > kafka that seems to be dealing with the related group and generations. > - See COMMENT lines in the files for further info. -- This message was sent by Atlassian JIRA (v6.4.14#64029)