[jira] [Comment Edited] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414673#comment-16414673 ] Cemalettin Koç edited comment on KAFKA-6711 at 3/27/18 7:13 AM: [~guozhang] Would you please check implementation please: https://github.com/cemo/kafka/commits/b-kafka-6711 I have done something preliminary. I will add necessary tests as well after your guidance. was (Author: cemo): [~guozhang] Would you please check implementation please: [https://github.com/cemo/kafka/commit/0cb2482259fec897f396e8b84ffb1921c4f3f63e] I have done something preliminary. I will add necessary tests as well after your guidance. > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6717) TopicPartition Assined twice to a consumer group for different consumers
Yuancheng PENG created KAFKA-6717: - Summary: TopicPartition Assined twice to a consumer group for different consumers Key: KAFKA-6717 URL: https://issues.apache.org/jira/browse/KAFKA-6717 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Yuancheng PENG I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain pattern. There are 10 consumers with the same group id. I expected that topic-partition to be assigned to only one consumer instance. However some topic partitions are assigned twice in 2 different difference instance, hence the consumer group process duplicate messages. {code:java} props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Collections.singletonList(StickyAssignor.class)); KafkaConsumer c = new KafkaConsumer<>(props); c.subscribe(Pattern.compile(TOPIC_PATTERN), new NoOpConsumerRebalanceListener()); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances
[ https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuancheng PENG updated KAFKA-6717: -- Summary: TopicPartition Assined twice to a consumer group for 2 consumer instances (was: TopicPartition Assined twice to a consumer group for different consumers ) > TopicPartition Assined twice to a consumer group for 2 consumer instances > -- > > Key: KAFKA-6717 > URL: https://issues.apache.org/jira/browse/KAFKA-6717 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Yuancheng PENG >Priority: Major > > I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain > pattern. > There are 10 consumers with the same group id. > I expected that topic-partition to be assigned to only one consumer instance. > However some topic partitions are assigned twice in 2 different difference > instance, hence the consumer group process duplicate messages. > {code:java} > props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > Collections.singletonList(StickyAssignor.class)); > KafkaConsumer c = new KafkaConsumer<>(props); > c.subscribe(Pattern.compile(TOPIC_PATTERN), new > NoOpConsumerRebalanceListener()); > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
Deepak Goyal created KAFKA-6718: --- Summary: Rack Aware Replica Task Assignment for Kafka Streams Key: KAFKA-6718 URL: https://issues.apache.org/jira/browse/KAFKA-6718 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 1.1.0 Reporter: Deepak Goyal |This features enables replica tasks to be assigned on different racks. Replication factor = x Number of Replica tasks = x totalTasks = x+1 (replica + active) # If there are no racks provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak Goyal updated KAFKA-6718: Description: |Machines in data centre are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When tasks are properly replicated across racks, it provides fault tolerance in that if a rack goes down, the remaining racks can continue to serve traffic. This feature is already implemented at Kafka but we needed similar for task assignments at Kafka Streams Application layer. This features enables replica tasks to be assigned on different racks for fault-tolerance. NUM_STANDBY_REPLICAS = x totalTasks = x+1 (replica + active) # If there are no rackID provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide. We have added another config in StreamsConfig called "RACK_ID_CONFIG" which helps StickyPartitionAssignor to assign tasks in such a way that no two replica tasks are on same rack if possible. Post that it also helps to maintain stickyness with-in the rack.| was: |This features enables replica tasks to be assigned on different racks. Replication factor = x Number of Replica tasks = x totalTasks = x+1 (replica + active) # If there are no racks provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide.| > Rack Aware Replica Task Assignment for Kafka Streams > > > Key: KAFKA-6718 > URL: https://issues.apache.org/jira/browse/KAFKA-6718 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Deepak Goyal >Priority: Major > > |Machines in data centre are sometimes grouped in racks. Racks provide > isolation as each rack may be in a different physical location and has its > own power source. When tasks are properly replicated across racks, it > provides fault tolerance in that if a rack goes down, the remaining racks can > continue to serve traffic. > > This feature is already implemented at Kafka but we needed similar for task > assignments at Kafka Streams Application layer. > > This features enables replica tasks to be assigned on different racks for > fault-tolerance. > NUM_STANDBY_REPLICAS = x > totalTasks = x+1 (replica + active) > # If there are no rackID provided: Cluster will behave rack-unaware > # If same rackId is given to all the nodes: Cluster will behave rack-unaware > # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. > each replica task is each assigned to a different rack. > # Id (totalTasks < number of racks), then it will first assign tasks on > different racks, further tasks will be assigned to least loaded node, cluster > wide. > We have added another config in StreamsConfig called "RACK_ID_CONFIG" which > helps StickyPartitionAssignor to assign tasks in such a way that no two > replica tasks are on same rack if possible. > Post that it also helps to maintain stickyness with-in the rack.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak Goyal updated KAFKA-6718: Description: |Machines in data centre are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When tasks are properly replicated across racks, it provides fault tolerance in that if a rack goes down, the remaining racks can continue to serve traffic. This feature is already implemented at Kafka [KIP-36\|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] but we needed similar for task assignments at Kafka Streams Application layer. This features enables replica tasks to be assigned on different racks for fault-tolerance. NUM_STANDBY_REPLICAS = x totalTasks = x+1 (replica + active) # If there are no rackID provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide.| We have added another config in StreamsConfig called "RACK_ID_CONFIG" which helps StickyPartitionAssignor to assign tasks in such a way that no two replica tasks are on same rack if possible. Post that it also helps to maintain stickyness with-in the rack.| was: |Machines in data centre are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When tasks are properly replicated across racks, it provides fault tolerance in that if a rack goes down, the remaining racks can continue to serve traffic. This feature is already implemented at Kafka but we needed similar for task assignments at Kafka Streams Application layer. This features enables replica tasks to be assigned on different racks for fault-tolerance. NUM_STANDBY_REPLICAS = x totalTasks = x+1 (replica + active) # If there are no rackID provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide. We have added another config in StreamsConfig called "RACK_ID_CONFIG" which helps StickyPartitionAssignor to assign tasks in such a way that no two replica tasks are on same rack if possible. Post that it also helps to maintain stickyness with-in the rack.| > Rack Aware Replica Task Assignment for Kafka Streams > > > Key: KAFKA-6718 > URL: https://issues.apache.org/jira/browse/KAFKA-6718 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Deepak Goyal >Priority: Major > > |Machines in data centre are sometimes grouped in racks. Racks provide > isolation as each rack may be in a different physical location and has its > own power source. When tasks are properly replicated across racks, it > provides fault tolerance in that if a rack goes down, the remaining racks can > continue to serve traffic. > > This feature is already implemented at Kafka > [KIP-36\|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] > but we needed similar for task assignments at Kafka Streams Application > layer. > > This features enables replica tasks to be assigned on different racks for > fault-tolerance. > NUM_STANDBY_REPLICAS = x > totalTasks = x+1 (replica + active) > # If there are no rackID provided: Cluster will behave rack-unaware > # If same rackId is given to all the nodes: Cluster will behave rack-unaware > # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. > each replica task is each assigned to a different rack. > # Id (totalTasks < number of racks), then it will first assign tasks on > different racks, further tasks will be assigned to least loaded node, cluster > wide.| > We have added another config in StreamsConfig called "RACK_ID_CONFIG" which > helps StickyPartitionAssignor to assign tasks in such a way that no two > replica tasks are on same rack if possible. > Post that it also helps to maintain stickyness with-in the rack.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepak Goyal updated KAFKA-6718: Description: |Machines in data centre are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When tasks are properly replicated across racks, it provides fault tolerance in that if a rack goes down, the remaining racks can continue to serve traffic. This feature is already implemented at Kafka [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] but we needed similar for task assignments at Kafka Streams Application layer. This features enables replica tasks to be assigned on different racks for fault-tolerance. NUM_STANDBY_REPLICAS = x totalTasks = x+1 (replica + active) # If there are no rackID provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide.| We have added another config in StreamsConfig called "RACK_ID_CONFIG" which helps StickyPartitionAssignor to assign tasks in such a way that no two replica tasks are on same rack if possible. Post that it also helps to maintain stickyness with-in the rack.| was: |Machines in data centre are sometimes grouped in racks. Racks provide isolation as each rack may be in a different physical location and has its own power source. When tasks are properly replicated across racks, it provides fault tolerance in that if a rack goes down, the remaining racks can continue to serve traffic. This feature is already implemented at Kafka [KIP-36\|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] but we needed similar for task assignments at Kafka Streams Application layer. This features enables replica tasks to be assigned on different racks for fault-tolerance. NUM_STANDBY_REPLICAS = x totalTasks = x+1 (replica + active) # If there are no rackID provided: Cluster will behave rack-unaware # If same rackId is given to all the nodes: Cluster will behave rack-unaware # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. each replica task is each assigned to a different rack. # Id (totalTasks < number of racks), then it will first assign tasks on different racks, further tasks will be assigned to least loaded node, cluster wide.| We have added another config in StreamsConfig called "RACK_ID_CONFIG" which helps StickyPartitionAssignor to assign tasks in such a way that no two replica tasks are on same rack if possible. Post that it also helps to maintain stickyness with-in the rack.| > Rack Aware Replica Task Assignment for Kafka Streams > > > Key: KAFKA-6718 > URL: https://issues.apache.org/jira/browse/KAFKA-6718 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Deepak Goyal >Priority: Major > > |Machines in data centre are sometimes grouped in racks. Racks provide > isolation as each rack may be in a different physical location and has its > own power source. When tasks are properly replicated across racks, it > provides fault tolerance in that if a rack goes down, the remaining racks can > continue to serve traffic. > > This feature is already implemented at Kafka > [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] > but we needed similar for task assignments at Kafka Streams Application > layer. > > This features enables replica tasks to be assigned on different racks for > fault-tolerance. > NUM_STANDBY_REPLICAS = x > totalTasks = x+1 (replica + active) > # If there are no rackID provided: Cluster will behave rack-unaware > # If same rackId is given to all the nodes: Cluster will behave rack-unaware > # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. > each replica task is each assigned to a different rack. > # Id (totalTasks < number of racks), then it will first assign tasks on > different racks, further tasks will be assigned to least loaded node, cluster > wide.| > We have added another config in StreamsConfig called "RACK_ID_CONFIG" which > helps StickyPartitionAssignor to assign tasks in such a way that no two > replica tasks are on same rack if possible. > Post that it also helps to maintain stickyness with-in the rack.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6698) ConsumerBounceTest#testClose sometimes fails
[ https://issues.apache.org/jira/browse/KAFKA-6698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407577#comment-16407577 ] Ted Yu edited comment on KAFKA-6698 at 3/27/18 3:17 PM: Test output consisted of repeated occurrence of: {code} [2018-03-21 07:02:01,683] ERROR ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes (org.apache.zookeeper.server.ZooKeeperServer:472) [2018-03-21 07:02:02,693] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1162) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) [2018-03-21 07:02:03,794] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1162) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) {code} was (Author: yuzhih...@gmail.com): Test output consisted of repeated occurrence of: {code} [2018-03-21 07:02:01,683] ERROR ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes (org.apache.zookeeper.server.ZooKeeperServer:472) [2018-03-21 07:02:02,693] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1162) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) [2018-03-21 07:02:03,794] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1162) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) {code} > ConsumerBounceTest#testClose sometimes fails > > > Key: KAFKA-6698 > URL: https://issues.apache.org/jira/browse/KAFKA-6698 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > Saw the following in > https://builds.apache.org/job/kafka-1.1-jdk7/94/testReport/junit/kafka.api/ConsumerBounceTest/testClose/ > : > {code} > org.apache.kafka.common.errors.TimeoutException: The consumer group command > timed out while waiting for group to initialize: > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6446. Resolution: Fixed Fix Version/s: 1.2.0 > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Assignee: huxihx >Priority: Critical > Fix For: 1.2.0 > > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415868#comment-16415868 ] ASF GitHub Bot commented on KAFKA-6446: --- hachikuji closed pull request #4563: KAFKA-6446: KafkaProducer should use timed version of `await` to avoid endless waiting URL: https://github.com/apache/kafka/pull/4563 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5fc9a1b9b38..a5af5b60093 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -256,6 +256,7 @@ private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; +private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -555,18 +556,36 @@ private static int parseAcks(String acksString) { * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * + * Note that this method will raise {@link TimeoutException} if the transactional state cannot + * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} + * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully + * initialized, this method should no longer be used. + * * @throws IllegalStateException if no transactional.id has been configured * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { throwIfNoTransactionManager(); -TransactionalRequestResult result = transactionManager.initializeTransactions(); -sender.wakeup(); -result.await(); +if (initTransactionsResult == null) { +initTransactionsResult = transactionManager.initializeTransactions(); +sender.wakeup(); +} + +try { +if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { +initTransactionsResult = null; +} else { +throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); +} +} catch (InterruptedException e) { +throw new InterruptException("Initialize transactions interrupted.", e); +} } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7eea4992b33..426b273b885 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) { return false; AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); -while (true) { +while (running) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index ff93da872dc..9c02e94c045 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -59,7 +59,10 @@ public void await() { }
[jira] [Updated] (KAFKA-5540) Deprecate and remove internal converter configs
[ https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-5540: - Fix Version/s: 1.2.0 > Deprecate and remove internal converter configs > --- > > Key: KAFKA-5540 > URL: https://issues.apache.org/jira/browse/KAFKA-5540 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > The internal.key.converter and internal.value.converter were original exposed > as configs because a) they are actually pluggable and b) providing a default > would require relying on the JsonConverter always being available, which > until we had classloader isolation it was possible might be removed for > compatibility reasons. > However, this has ultimately just caused a lot more trouble and confusion > than it is worth. We should deprecate the configs, give them a default of > JsonConverter (which is also kind of nice since it results in human-readable > data in the internal topics), and then ultimately remove them in the next > major version. > These are all public APIs so this will need a small KIP before we can make > the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-5540) Deprecate and remove internal converter configs
[ https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-5540: Assignee: Chris Egerton > Deprecate and remove internal converter configs > --- > > Key: KAFKA-5540 > URL: https://issues.apache.org/jira/browse/KAFKA-5540 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > Fix For: 1.2.0 > > > The internal.key.converter and internal.value.converter were original exposed > as configs because a) they are actually pluggable and b) providing a default > would require relying on the JsonConverter always being available, which > until we had classloader isolation it was possible might be removed for > compatibility reasons. > However, this has ultimately just caused a lot more trouble and confusion > than it is worth. We should deprecate the configs, give them a default of > JsonConverter (which is also kind of nice since it results in human-readable > data in the internal topics), and then ultimately remove them in the next > major version. > These are all public APIs so this will need a small KIP before we can make > the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415952#comment-16415952 ] Chris Egerton commented on KAFKA-6417: -- Just detecting a JAR file on a {{plugin.path}} directory isn't sufficient cause for alarm, since uber JARs are supported for plugins. An alternative could be to log a warning if a JAR file is detected in a {{plugin.path}} directory that doesn't contain any plugins. The warning could read something like "Archive file in plugin path directory does not contain any recognizable plugins and will not be used, even as a dependency for other plugins in the same directory." Not completely in love with that wording/criteria; in the event that your {{plugin.path}} looks like {{/plugin/path,plugin/path/plugin1}}, non-plugin JARs found in the {{plugin/path/plugin1}} directory will then be incorrectly flagged even though they can be used as dependencies for {{plugin1}} since it is correctly formatted for use as a plugin in the {{/plugin/path}} directory. [~cotedm], thoughts? > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-6718: Assignee: Deepak Goyal > Rack Aware Replica Task Assignment for Kafka Streams > > > Key: KAFKA-6718 > URL: https://issues.apache.org/jira/browse/KAFKA-6718 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Deepak Goyal >Assignee: Deepak Goyal >Priority: Major > > |Machines in data centre are sometimes grouped in racks. Racks provide > isolation as each rack may be in a different physical location and has its > own power source. When tasks are properly replicated across racks, it > provides fault tolerance in that if a rack goes down, the remaining racks can > continue to serve traffic. > > This feature is already implemented at Kafka > [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] > but we needed similar for task assignments at Kafka Streams Application > layer. > > This features enables replica tasks to be assigned on different racks for > fault-tolerance. > NUM_STANDBY_REPLICAS = x > totalTasks = x+1 (replica + active) > # If there are no rackID provided: Cluster will behave rack-unaware > # If same rackId is given to all the nodes: Cluster will behave rack-unaware > # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. > each replica task is each assigned to a different rack. > # Id (totalTasks < number of racks), then it will first assign tasks on > different racks, further tasks will be assigned to least loaded node, cluster > wide.| > We have added another config in StreamsConfig called "RACK_ID_CONFIG" which > helps StickyPartitionAssignor to assign tasks in such a way that no two > replica tasks are on same rack if possible. > Post that it also helps to maintain stickyness with-in the rack.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415967#comment-16415967 ] Guozhang Wang commented on KAFKA-6718: -- [~_deepakgoyal] thanks for creating the KIP! I've assigned the JIRA to you. About the KIP itself, please note that if you are enhancing the rebalance protocol to encode the rack id information, these two KIPs are correlated and I'd recommend you read about them first: Any protocol changes will need to consider a smooth upgrade path: https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade We'd like to encode some other information into the metadata to enhance partition assignor's workload balance awareness: https://cwiki.apache.org/confluence/display/KAFKA/KIP-262%3A+Metadata+should+include+number+of+state+stores+for+task > Rack Aware Replica Task Assignment for Kafka Streams > > > Key: KAFKA-6718 > URL: https://issues.apache.org/jira/browse/KAFKA-6718 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Deepak Goyal >Assignee: Deepak Goyal >Priority: Major > > |Machines in data centre are sometimes grouped in racks. Racks provide > isolation as each rack may be in a different physical location and has its > own power source. When tasks are properly replicated across racks, it > provides fault tolerance in that if a rack goes down, the remaining racks can > continue to serve traffic. > > This feature is already implemented at Kafka > [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] > but we needed similar for task assignments at Kafka Streams Application > layer. > > This features enables replica tasks to be assigned on different racks for > fault-tolerance. > NUM_STANDBY_REPLICAS = x > totalTasks = x+1 (replica + active) > # If there are no rackID provided: Cluster will behave rack-unaware > # If same rackId is given to all the nodes: Cluster will behave rack-unaware > # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. > each replica task is each assigned to a different rack. > # Id (totalTasks < number of racks), then it will first assign tasks on > different racks, further tasks will be assigned to least loaded node, cluster > wide.| > We have added another config in StreamsConfig called "RACK_ID_CONFIG" which > helps StickyPartitionAssignor to assign tasks in such a way that no two > replica tasks are on same rack if possible. > Post that it also helps to maintain stickyness with-in the rack.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6127: --- Description: Streams uses three consumer APIs that can block infinite: {{commitSync()}}, {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block (fixed in KAFKA-6446) and we should double check the code if we handle this case correctly. If we block within one operation, the whole {{StreamThread}} would block, and the instance does not make any progress, becomes unresponsive (for example, {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer group. We might consider to use {{wakeup()}} calls to unblock those operations to keep {{StreamThread}} in a responsive state. Note: there are discussion to add timeout to those calls, and thus, we could get {{TimeoutExceptions}}. This would be easier to handle than using {{wakeup()}}. Thus, we should keep an eye on those discussions. was: Streams uses three consumer APIs that can block infinite: {{commitSync()}}, {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. If we block within one operation, the whole {{StreamThread}} would block, and the instance does not make any progress, becomes unresponsive (for example, {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer group. We might consider to use {{wakeup()}} calls to unblock those operations to keep {{StreamThread}} in a responsive state. Note: there are discussion to add timeout to those calls, and thus, we could get {{TimeoutExceptions}}. This would be easier to handle than using {{wakeup()}}. Thus, we should keep an eye on those discussions. > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > We might consider to use {{wakeup()}} calls to unblock those operations to > keep {{StreamThread}} in a responsive state. > Note: there are discussion to add timeout to those calls, and thus, we could > get {{TimeoutExceptions}}. This would be easier to handle than using > {{wakeup()}}. Thus, we should keep an eye on those discussions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6446: --- Labels: exactly-once (was: ) > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Assignee: huxihx >Priority: Critical > Labels: exactly-once > Fix For: 1.2.0 > > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6127) Streams should never block infinitely
[ https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6127: --- Labels: exactly-once (was: ) > Streams should never block infinitely > - > > Key: KAFKA-6127 > URL: https://issues.apache.org/jira/browse/KAFKA-6127 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: exactly-once > > Streams uses three consumer APIs that can block infinite: {{commitSync()}}, > {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. > If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block > (fixed in KAFKA-6446) and we should double check the code if we handle this > case correctly. > If we block within one operation, the whole {{StreamThread}} would block, and > the instance does not make any progress, becomes unresponsive (for example, > {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer > group. > We might consider to use {{wakeup()}} calls to unblock those operations to > keep {{StreamThread}} in a responsive state. > Note: there are discussion to add timeout to those calls, and thus, we could > get {{TimeoutExceptions}}. This would be easier to handle than using > {{wakeup()}}. Thus, we should keep an eye on those discussions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6718: --- Labels: needs-kip (was: ) > Rack Aware Replica Task Assignment for Kafka Streams > > > Key: KAFKA-6718 > URL: https://issues.apache.org/jira/browse/KAFKA-6718 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Deepak Goyal >Assignee: Deepak Goyal >Priority: Major > Labels: needs-kip > > |Machines in data centre are sometimes grouped in racks. Racks provide > isolation as each rack may be in a different physical location and has its > own power source. When tasks are properly replicated across racks, it > provides fault tolerance in that if a rack goes down, the remaining racks can > continue to serve traffic. > > This feature is already implemented at Kafka > [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment] > but we needed similar for task assignments at Kafka Streams Application > layer. > > This features enables replica tasks to be assigned on different racks for > fault-tolerance. > NUM_STANDBY_REPLICAS = x > totalTasks = x+1 (replica + active) > # If there are no rackID provided: Cluster will behave rack-unaware > # If same rackId is given to all the nodes: Cluster will behave rack-unaware > # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. > each replica task is each assigned to a different rack. > # Id (totalTasks < number of racks), then it will first assign tasks on > different racks, further tasks will be assigned to least loaded node, cluster > wide.| > We have added another config in StreamsConfig called "RACK_ID_CONFIG" which > helps StickyPartitionAssignor to assign tasks in such a way that no two > replica tasks are on same rack if possible. > Post that it also helps to maintain stickyness with-in the rack.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6719) Kafka Reassign Partitions Failure
Srinivas Dhruvakumar created KAFKA-6719: --- Summary: Kafka Reassign Partitions Failure Key: KAFKA-6719 URL: https://issues.apache.org/jira/browse/KAFKA-6719 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.11.0.2 Reporter: Srinivas Dhruvakumar Attachments: Screen Shot 2018-03-27 at 10.27.29 AM.png The Kafka reassign partition fails with the following error !Screen Shot 2018-03-27 at 10.27.29 AM.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6719) Kafka Reassign Partitions Failure
[ https://issues.apache.org/jira/browse/KAFKA-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Srinivas Dhruvakumar resolved KAFKA-6719. - Resolution: Invalid > Kafka Reassign Partitions Failure > -- > > Key: KAFKA-6719 > URL: https://issues.apache.org/jira/browse/KAFKA-6719 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.2 >Reporter: Srinivas Dhruvakumar >Priority: Major > Attachments: Screen Shot 2018-03-27 at 10.27.29 AM.png > > > The Kafka reassign partition fails with the following error > !Screen Shot 2018-03-27 at 10.27.29 AM.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416143#comment-16416143 ] Dustin Cote commented on KAFKA-6417: [~ChrisEgerton] I think the problem with this is that in practice you could have so many jars that also match this criteria that the log becomes chatty and the message gets missed (this happens a lot with the ProducerConfig and ConsumerConfig warnings that pop out). There would probably be a lot of false positives here. One thought I have is to enforce an easy to understand policy that no jars are allowed at all directly under {{plugin.path}} and just crash the worker saying so to "fail fast" in the event someone isn't following convention. That would require uber jar users to make a top level directory to contain their jar which may be a little annoying but would be easy to understand. This doesn't seem like a severe penalty in general considering the real usage pattern but would absolutely have compatibility issues with what exists out there today (on upgrade you may see trouble). Alternatively, failing an individual connector might work too, but I don't know how that would go in practice. > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416164#comment-16416164 ] ASF GitHub Bot commented on KAFKA-6711: --- cemo opened a new pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-mem… URL: https://github.com/apache/kafka/pull/4782 …ory stores in checkpoint file *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416168#comment-16416168 ] Cemalettin Koç commented on KAFKA-6711: --- Sorry to open quickly issue :( Bot polluted issue. Here is the my quick shot to issue. Please not that I am pretty much to new Kafka land. Review twice :) https://github.com/apache/kafka/pull/4782 > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416174#comment-16416174 ] Chris Egerton commented on KAFKA-6417: -- There's also the case where a plugin performs some operations either statically or, in the case of a connector, in its no-args constructor, in which case the resulting {{ClassNotFoundException}} is thrown while starting up connect (in either distributed or standalone mode), and causes the framework to crash. If this happens, there's no guarantee that any of the non-plugin-containing JARs would be scanned before the plugin JAR itself is scanned and the framework-halting exception is thrown, so we can't rely on the user having seen any warnings beforehand about non-plugin-containing JARs not being used as dependencies for other plugins. Handling this separate case could involve intercepting the {{ClassNotFoundException}} and outputting a similar warning message about improper plugin path structure, before throwing the same exception and causing the framework to halt like before. > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416174#comment-16416174 ] Chris Egerton edited comment on KAFKA-6417 at 3/27/18 8:18 PM: --- Sorry, started working on this comment before I saw yours, [~cotedm]; I'll address your points now in a separate one. There's also the case where a plugin performs some operations either statically or, in the case of a connector, in its no-args constructor, in which case the resulting {{ClassNotFoundException}} is thrown while starting up connect (in either distributed or standalone mode), and causes the framework to crash. If this happens, there's no guarantee that any of the non-plugin-containing JARs would be scanned before the plugin JAR itself is scanned and the framework-halting exception is thrown, so we can't rely on the user having seen any warnings beforehand about non-plugin-containing JARs not being used as dependencies for other plugins. Handling this separate case could involve intercepting the {{ClassNotFoundException}} and outputting a similar warning message about improper plugin path structure, before throwing the same exception and causing the framework to halt like before. was (Author: chrisegerton): There's also the case where a plugin performs some operations either statically or, in the case of a connector, in its no-args constructor, in which case the resulting {{ClassNotFoundException}} is thrown while starting up connect (in either distributed or standalone mode), and causes the framework to crash. If this happens, there's no guarantee that any of the non-plugin-containing JARs would be scanned before the plugin JAR itself is scanned and the framework-halting exception is thrown, so we can't rely on the user having seen any warnings beforehand about non-plugin-containing JARs not being used as dependencies for other plugins. Handling this separate case could involve intercepting the {{ClassNotFoundException}} and outputting a similar warning message about improper plugin path structure, before throwing the same exception and causing the framework to halt like before. > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416201#comment-16416201 ] Chris Egerton commented on KAFKA-6417: -- Curious--why do you think false positives would be a big issue? Do most users (that you know of and/or that have encountered this issue) have nested directories included in their {{plugin.path}} (e.g., {{plugin.path=foo/bar,foo/bar/baz}}) and/or a bunch of unnecessary JARs lying around? If we make it clear that we're only warning that the JAR we've encountered won't be used as a dependency for plugin JARs in the same directory, the warning can be safely ignored by anyone reading it who isn't encountering an issue instantiating a given plugin but will be useful for anyone who is, as long as they don't have nested directories in their {{plugin.path}}. To be clear, your suggestion for altering the plugin scanning behavior is essentially that we no longer differentiate uber JARs from other JARs and place all JAR files for a plugin in a directory underneath a directory supplied as a plugin path, correct? Can ask around to see what people think about that before writing up a KIP but it seems like a bit of a nuclear option given the compatibility issues you mentioned. RE: failing individual plugins: Have to dig into the code base more to verify this, but that could be as simple as adding another {{catch}} block for the {{ClassNotFoundException}} that gets thrown under these circumstances and outputting an appropriate error message about not being able to use the connector, similar to [what is already done|https://github.com/apache/kafka/blob/395c7e0f0985b424ea2bc2bd40c0237eada24dcf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L183-L189] in the plugin scanning phase for Connect. Thoughts? > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416232#comment-16416232 ] Dustin Cote commented on KAFKA-6417: [~ChrisEgerton] yeah it's a bit extreme probably to kill the whole worker, so just failing the non-conforming connector is better IMO. However, the convention today is pretty convoluted and I think the pain of breaking compatibility (for some major release) isn't so bad. We do have lots of users with polluted classpaths, especially coming from a hadoop world where they just have hundreds of jars on their system that they don't really manage themselves. The original report of this actually came from that type of situation and once you're there it's hard to figure out what's going on. Adding more stuff in the log might help if you know what you are looking for a priori but coming at it from just a ClassNotFoundException, you have hundreds of jars to sift through the point gets lost a bit. A user may be thinking "if I'm missing a class why do I have this repeated message that these JARs won't be used as a dependency?" This is a more complex problem for a user to solve than, "my worker said it went down because I have JARs where they shouldn't be". The corrective action for the former is pretty unclear, but the latter I think the action is clear and if a class isn't found in that case you know what directory to look in. > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6473) Add MockProcessorContext to public test-utils
[ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416246#comment-16416246 ] ASF GitHub Bot commented on KAFKA-6473: --- mjsax closed pull request #4736: KAFKA-6473: Add MockProcessorContext to public test-utils URL: https://github.com/apache/kafka/pull/4736 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 8de03efeba0..d5fd7d5fd7b 100644 --- a/build.gradle +++ b/build.gradle @@ -1002,6 +1002,10 @@ project(':streams:examples') { compile project(':streams') compile project(':connect:json') // this dependency should be removed after we unify data API compile libs.slf4jlog4j + +testCompile project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest +testCompile project(':streams:test-utils') +testCompile libs.junit } javadoc { diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 34ac89ffe05..8552bcc8674 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -66,6 +66,7 @@ Writing streams back to Kafka +Testing a Streams application @@ -3154,6 +3155,10 @@ Overview +Testing a Streams application +Kafka Streams comes with a test-utils module to help you test your application here. + diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index b51bc22cfe2..e3432b79b7c 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -41,13 +41,16 @@ Table of Contents Overview -Defining a Stream Processor -State Stores -Defining and creating a State Store -Fault-tolerant State Stores -Enable or Disable Fault Tolerance of State Stores (Store Changelogs) -Implementing Custom State Stores - +Defining a Stream +Processor +Unit Testing Processors +State Stores + +Defining and creating a State Store +Fault-tolerant State Stores +Enable or Disable Fault Tolerance of State Stores (Store Changelogs) +Implementing Custom State Stores + Connecting Processors and State Stores @@ -98,11 +101,12 @@ OverviewPunctuationType types within the same processor by calling ProcessorContext#schedule() multiple times inside init() method. -Attention +Attention Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate(). +Example The following example Processor defines a simple word-count algorithm and the following actions are performed: In the init() method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”. @@ -159,6 +163,16 @@ Overviewstate stores documentation. + + +Unit Testing Processors + + + +Kafka Streams comes with a test-utils module to help you write unit tests for your +processors here. + + State Stores To implement a stateful Processor or Transformer, you must provide one or more state stores to the processor diff --git a/docs/streams/developer-guide/testing.html b/docs/streams/developer-guide/testing.html index e6886a1689f..ea2ae987c7e 100644 --- a/docs/streams/developer-guide/testing.html +++ b/docs/streams/developer-guide/testing.
[jira] [Resolved] (KAFKA-6473) Add MockProcessorContext to public test-utils
[ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6473. Resolution: Fixed Fix Version/s: 1.2.0 > Add MockProcessorContext to public test-utils > - > > Key: KAFKA-6473 > URL: https://issues.apache.org/jira/browse/KAFKA-6473 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Major > Labels: needs-kip, user-experience > Fix For: 1.2.0 > > > With KIP-247, we added public test-utils artifact with a TopologyTestDriver > class. Using the test driver for a single > Processor/Transformer/ValueTransformer it's required to specify a whole > topology with source and sink and plus the > Processor/Transformer/ValueTransformer into it. > For unit testing, it might be more convenient to have a MockProcessorContext, > that can be used to test the Processor/Transformer/ValueTransformer in > isolation. Ie, the test itself creates new > Processor/Transformer/ValueTransformer object and calls init() manually > passing in the MockProcessorContext. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416281#comment-16416281 ] Vikas Tikoo commented on KAFKA-5882: We have encountered this issue 19 times in the last 15 days, and always around redeploys. Running kafka-streams v0.11.0.0. > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Major > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6720) Inconsistent Kafka Streams behaviour when topic does not exist
Daniel Wojda created KAFKA-6720: --- Summary: Inconsistent Kafka Streams behaviour when topic does not exist Key: KAFKA-6720 URL: https://issues.apache.org/jira/browse/KAFKA-6720 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.1 Reporter: Daniel Wojda When Kafka Streams starts it reads metadata about topics used in topology and it's partitions. If topology of that stream contains stateful operation like #join, and a topic does not exist [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719] will be thrown. In case of streams with simple topology with stateless operations only, like #mapValue, and topic does not exist, Kafka Streams does not throw any exception, just logs a warning: ["log.warn("No partitions found for topic {}", topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435] I believe the behaviour of Kafka Streams in both cases should be the same, and it should throw TopologyBuilderException. I am more than happy to prepare a Pull Request if it is a valid issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6720) Inconsistent Kafka Streams behaviour when topic does not exist
[ https://issues.apache.org/jira/browse/KAFKA-6720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mariam John resolved KAFKA-6720. Resolution: Duplicate This is similar to KAFKA-6437. > Inconsistent Kafka Streams behaviour when topic does not exist > -- > > Key: KAFKA-6720 > URL: https://issues.apache.org/jira/browse/KAFKA-6720 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Daniel Wojda >Priority: Minor > > When Kafka Streams starts it reads metadata about topics used in topology > and it's partitions. If topology of that stream contains stateful operation > like #join, and a topic does not exist > [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719] > will be thrown. > In case of streams with simple topology with stateless operations only, like > #mapValue, and topic does not exist, Kafka Streams does not throw any > exception, just logs a warning: > ["log.warn("No partitions found for topic {}", > topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435] > > I believe the behaviour of Kafka Streams in both cases should be the same, > and it should throw TopologyBuilderException. > I am more than happy to prepare a Pull Request if it is a valid issue. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416382#comment-16416382 ] Srinivas Dhruvakumar commented on KAFKA-6649: - I am trying out the patch "high watermark could be incorrectly set to -1". But I am unable to reproduce the above scenario " : org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 " Does anyone know how to reproduce the above error ? > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416425#comment-16416425 ] Matthias J. Sax commented on KAFKA-5882: Thanks for reporting this. Can you upgrade to 0.11.0.2 or even better 1.0.1 ? Also note, that 1.1 should be release shortly. > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Major > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend
[ https://issues.apache.org/jira/browse/KAFKA-6716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416428#comment-16416428 ] ASF GitHub Bot commented on KAFKA-6716: --- huxihx opened a new pull request #4783: KAFKA-6716: Should close the `discardChannel` in completeSend URL: https://github.com/apache/kafka/pull/4783 KAFKA-6716: Should close the `discardChannel` in completeSend https://issues.apache.org/jira/browse/KAFKA-6716 Should close the `discardChannel` in MockSelector#completeSend *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > discardChannel should be released in MockSelector#completeSend > -- > > Key: KAFKA-6716 > URL: https://issues.apache.org/jira/browse/KAFKA-6716 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > {code} > private void completeSend(Send send) throws IOException { > // Consume the send so that we will be able to send more requests to > the destination > ByteBufferChannel discardChannel = new ByteBufferChannel(send.size()); > while (!send.completed()) { > send.writeTo(discardChannel); > } > completedSends.add(send); > } > {code} > The {{discardChannel}} should be closed before returning from the method -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend
[ https://issues.apache.org/jira/browse/KAFKA-6716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-6716: - Assignee: huxihx > discardChannel should be released in MockSelector#completeSend > -- > > Key: KAFKA-6716 > URL: https://issues.apache.org/jira/browse/KAFKA-6716 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: huxihx >Priority: Minor > > {code} > private void completeSend(Send send) throws IOException { > // Consume the send so that we will be able to send more requests to > the destination > ByteBufferChannel discardChannel = new ByteBufferChannel(send.size()); > while (!send.completed()) { > send.writeTo(discardChannel); > } > completedSends.add(send); > } > {code} > The {{discardChannel}} should be closed before returning from the method -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6721) Consolidate state store management for global stores and normal stores
Guozhang Wang created KAFKA-6721: Summary: Consolidate state store management for global stores and normal stores Key: KAFKA-6721 URL: https://issues.apache.org/jira/browse/KAFKA-6721 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today the internal logic of handling global state store restoration and update, and normal store restoration are separated in two set of classes. Hence whenever we are updating the logic for one of it we need to do the same for others, which we may easily forget, causing regressions. As a tech debt cleanup we should consider consolidating the logic of global state stores into `StateRestorer` and `StoreChangelogReader` if possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416438#comment-16416438 ] Chris Egerton commented on KAFKA-6417: -- Alright, given that insight I agree that issuing warnings for unused JARs would just end up flooding logs with often-unnecessary messages. To clarify, do you think it'd make more sense at this point to pursue changing the plugin structure or to catch the {{ClassNotFoundException}} caused by improper structure and add an error message about said improper structure before failing the connector? Open to either at this point, although still leaning away from changing plugin structure a small amount. > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter
[ https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416444#comment-16416444 ] ASF GitHub Bot commented on KAFKA-6386: --- guozhangwang closed pull request #4354: KAFKA-6386:use Properties instead of StreamsConfig in KafkaStreams constructor URL: https://github.com/apache/kafka/pull/4354 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index baf9633a0c3..464854c57ad 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -86,7 +86,11 @@ Streams API to let users specify inner serdes if the default serde classes are windowed serdes. For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs";>KIP-265. / - + + + We have deprecated StreamsConfig in KafkaStreams constructors. Now we only take in java.util.Properties since StreamsConfig is immutable and is created from a Properties object itself. +For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor";>KIP-245. + Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API";>KIP-251). To enable this new feature, ProcessorContext#forward(...) was modified. diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java index 2ea5218d647..8a6ec05b4ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java @@ -26,7 +26,7 @@ /** * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to a {@link KafkaStreams} instance. * - * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig, KafkaClientSupplier) + * @see KafkaStreams#KafkaStreams(Topology, java.util.Properties, KafkaClientSupplier) */ public interface KafkaClientSupplier { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 1a70e4638ef..186276c22d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -515,31 +515,18 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store } /** - * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead + * Create a {@code KafkaStreams} instance. + * + * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance, + * you still must {@link #close()} it to avoid resource leaks. + * + * @param topology the topology specifying the computational logic + * @param propsproperties for {@link StreamsConfig} + * @throws StreamsException if any fatal error occurs */ -@Deprecated -public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder, +public KafkaStreams(final Topology topology, final Properties props) { -this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); -} - -/** - * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead - */ -@Deprecated -public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder, -final StreamsConfig config) { -this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier()); -} - -/** - * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)} instead - */ -@Deprecated -public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder, -final StreamsConfig config, -final KafkaClientSupplier clientSupplier) { -this(builder.internalTopologyBuilder, config, clientSupplier); +this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier()); } /** @@ -548,13 +535,16 @@ public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder bui * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance, * you still mu
[jira] [Closed] (KAFKA-6473) Add MockProcessorContext to public test-utils
[ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-6473. --- The PR for this work is merged: https://github.com/apache/kafka/pull/4736 > Add MockProcessorContext to public test-utils > - > > Key: KAFKA-6473 > URL: https://issues.apache.org/jira/browse/KAFKA-6473 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Major > Labels: needs-kip, user-experience > Fix For: 1.2.0 > > > With KIP-247, we added public test-utils artifact with a TopologyTestDriver > class. Using the test driver for a single > Processor/Transformer/ValueTransformer it's required to specify a whole > topology with source and sink and plus the > Processor/Transformer/ValueTransformer into it. > For unit testing, it might be more convenient to have a MockProcessorContext, > that can be used to test the Processor/Transformer/ValueTransformer in > isolation. Ie, the test itself creates new > Processor/Transformer/ValueTransformer object and calls init() manually > passing in the MockProcessorContext. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416494#comment-16416494 ] Guozhang Wang commented on KAFKA-6437: -- This is a interesting reported issue in KAFKA-6720 that for join-involved topics, if it does not exist yet a exception will be thrown. I think it is not a complete duplicate of this ticket, and I'd like to summarize the "inconsistent" behavior that we are facing today: 1) For join operation from user topics directly (i.e. no reshuffling added as Streams assumes input topics already partitioned by key), we'd require user topics pre-exist; and if not, we throw TopologyBuilderException. 2) For join operation from repartition topics, since they are note available at assignment phase we "assume" the repartition topics will be created and become available, hence we do not check if the source topics are available. When the source topic is missing, and hence no data will be send to the repartition topics at all, Streams will hang (this is what this JIRA reported). 3) For stateless operations, if a source topic was missing, Streams will continue but generate a warning. So I think the actual fix should be in two folds: 1) We can [collect all external topic's num.partition|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L425-L437] at the very beginning of the assign() phase, and log a warning entry if some of the topic's metadata cannot be found. 2) In step one we do not need to [query the metadata|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L341] again but we can get directly from the collected available num.partitions map. 3) The finally in ensureCopartitioning, if the metadata cannot be found we skip the [checking co-partition phase|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L665] but log another warning that "since the topic is not found, we will skip the co-partition validation .." > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Assignee: Mariam John >Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic an
[jira] [Comment Edited] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416438#comment-16416438 ] Chris Egerton edited comment on KAFKA-6417 at 3/28/18 12:14 AM: [~cotedm] Alright, given that insight I agree that issuing warnings for unused JARs would just end up flooding logs with often-unnecessary messages and a different approach is warranted. To clarify, do you think it'd make more sense at this point to pursue changing the plugin structure or to catch the {{ClassNotFoundException}} caused by improper structure and add an error message about said improper structure before failing the connector? Open to either at this point, although still leaning away from changing plugin structure a small amount. was (Author: chrisegerton): Alright, given that insight I agree that issuing warnings for unused JARs would just end up flooding logs with often-unnecessary messages. To clarify, do you think it'd make more sense at this point to pursue changing the plugin structure or to catch the {{ClassNotFoundException}} caused by improper structure and add an error message about said improper structure before failing the connector? Open to either at this point, although still leaning away from changing plugin structure a small amount. > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6722) SensorAccess.getOrCreate should be more efficient
wade wu created KAFKA-6722: -- Summary: SensorAccess.getOrCreate should be more efficient Key: KAFKA-6722 URL: https://issues.apache.org/jira/browse/KAFKA-6722 Project: Kafka Issue Type: Improvement Reporter: wade wu The lock/unlock of read lock in getOrCreate() is not necessary, or it should be refactored. For each request from Producer, this read lock lock/unlock is called and lock/unlock, it is costing the time. It can be easily fixed using code below, and it is still thread safe: var sensor: Sensor = metrics.getSensor(sensorName) if (sensor == null) { lock.writeLock().lock() try{ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6722) SensorAccess.getOrCreate should be more efficient
[ https://issues.apache.org/jira/browse/KAFKA-6722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wade wu updated KAFKA-6722: --- Description: The lock/unlock of read lock in getOrCreate() is not necessary, or it should be refactored. For each request from Producer, this read lock lock/unlock is called and lock/unlock, it is costing the time. The existing code is doing this in order to wait until the sensor initialization is finished, but this can be done when the sensor is created under the write lock, by having the thread sleep for a while (few milliseconds), and this time can be amortized, since sensor creating is a one time thing. It can be easily fixed using code below, and it is still thread safe: var sensor: Sensor = metrics.getSensor(sensorName) if (sensor == null) { lock.writeLock().lock() try{ was: The lock/unlock of read lock in getOrCreate() is not necessary, or it should be refactored. For each request from Producer, this read lock lock/unlock is called and lock/unlock, it is costing the time. It can be easily fixed using code below, and it is still thread safe: var sensor: Sensor = metrics.getSensor(sensorName) if (sensor == null) { lock.writeLock().lock() try{ > SensorAccess.getOrCreate should be more efficient > - > > Key: KAFKA-6722 > URL: https://issues.apache.org/jira/browse/KAFKA-6722 > Project: Kafka > Issue Type: Improvement >Reporter: wade wu >Priority: Major > > The lock/unlock of read lock in getOrCreate() is not necessary, or it should > be refactored. For each request from Producer, this read lock lock/unlock is > called and lock/unlock, it is costing the time. > The existing code is doing this in order to wait until the sensor > initialization is finished, but this can be done when the sensor is created > under the write lock, by having the thread sleep for a while (few > milliseconds), and this time can be amortized, since sensor creating is a one > time thing. > It can be easily fixed using code below, and it is still thread safe: > > var sensor: Sensor = metrics.getSensor(sensorName) > if (sensor == null) { > lock.writeLock().lock() > try{ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416542#comment-16416542 ] Dustin Cote commented on KAFKA-6417: I think catching the exception and suggesting an action to correct the problem is probably more sensible in the immediate term and may be good enough to keep people from doing something silly. I just wasn't sure how easy that is to implement. If it's not risky (i.e. won't catch something else we should be worried about) then I'm all for it :) Later on if people still struggle, then it's probably just too confusing and we can do something structurally easier to understand. Hope that helps! > plugin.path pointing at a plugin directory causes ClassNotFoundException > > > Key: KAFKA-6417 > URL: https://issues.apache.org/jira/browse/KAFKA-6417 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Dustin Cote >Priority: Major > > When using the {{plugin.path}} configuration for the Connect workers, the > user is expected to specify a list containing the following per the docs: > {quote} > The list should consist of top level directories that include any combination > of: a) directories immediately containing jars with plugins and their > dependencies b) uber-jars with plugins and their dependencies c) directories > immediately containing the package directory structure of classes of plugins > and their dependencies > {quote} > This means we would expect {{plugin.path=/usr/share/plugins}} for a structure > like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. > However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the > resulting behavior is that dependencies for {{myplugin1}} are not properly > loaded. This causes a {{ClassNotFoundException}} that is not intuitive to > debug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416382#comment-16416382 ] Srinivas Dhruvakumar edited comment on KAFKA-6649 at 3/28/18 2:31 AM: -- I am trying out the patch "high watermark could be incorrectly set to -1" KAFKA-3978. But I am unable to reproduce the above scenario " : org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 " Does anyone know how to reproduce the above error ? was (Author: srinivas.d...@gmail.com): I am trying out the patch "high watermark could be incorrectly set to -1". But I am unable to reproduce the above scenario " : org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 " Does anyone know how to reproduce the above error ? > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6473) Add MockProcessorContext to public test-utils
[ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6473: --- Labels: kip user-experience (was: needs-kip user-experience) > Add MockProcessorContext to public test-utils > - > > Key: KAFKA-6473 > URL: https://issues.apache.org/jira/browse/KAFKA-6473 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Major > Labels: kip, user-experience > Fix For: 1.2.0 > > > With KIP-247, we added public test-utils artifact with a TopologyTestDriver > class. Using the test driver for a single > Processor/Transformer/ValueTransformer it's required to specify a whole > topology with source and sink and plus the > Processor/Transformer/ValueTransformer into it. > For unit testing, it might be more convenient to have a MockProcessorContext, > that can be used to test the Processor/Transformer/ValueTransformer in > isolation. Ie, the test itself creates new > Processor/Transformer/ValueTransformer object and calls init() manually > passing in the MockProcessorContext. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer
Boyang Chen created KAFKA-6723: -- Summary: Separate "max.poll.record" for restore consumer and common consumer Key: KAFKA-6723 URL: https://issues.apache.org/jira/browse/KAFKA-6723 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen Assignee: Boyang Chen Currently, Kafka Streams use `max.poll.record` config for both restore consumer and normal stream consumer. In reality, they are doing different processing workloads, and in order to speed up the restore speed, restore consumer is supposed to have a higher throughput by setting `max.poll.record` higher. The change involved is trivial: [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149] However, this is still a public API change (introducing a new config name), so we need a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer
[ https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416749#comment-16416749 ] Boyang Chen commented on KAFKA-6723: [~guozhang] [~liquanpei] [~mjsax] Thoughts on this? > Separate "max.poll.record" for restore consumer and common consumer > --- > > Key: KAFKA-6723 > URL: https://issues.apache.org/jira/browse/KAFKA-6723 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > > Currently, Kafka Streams use `max.poll.record` config for both restore > consumer and normal stream consumer. In reality, they are doing different > processing workloads, and in order to speed up the restore speed, restore > consumer is supposed to have a higher throughput by setting `max.poll.record` > higher. The change involved is trivial: > [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149] > However, this is still a public API change (introducing a new config name), > so we need a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6642) Rack aware replica assignment in kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778 ] Ashish Surana commented on KAFKA-6642: -- Current task assignor is sticky, and it can be made rack-aware. Where we ensure that same tasks (active & replicas) are assigned different racks as much as possible. Approach # RACK_ID is added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same rack_id are considered in the same rack. # No changes in input topic partition to task assignment Assignment of tasks to stream instances: # We assign active tasks to the instances which were having same task as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances which were having same task as standby previously. # Active tasks which still couldn't be assigned to instances in round-robin starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one unique active task so no extra rack aware logic is required in this step. # Now we have to assign standy-tasks, and here we assign standby to instances in different rack then it's active task or other standy-tasks are running. If we run out of racks then we can assign standby-tasks in same rack but different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. > Rack aware replica assignment in kafka streams > -- > > Key: KAFKA-6642 > URL: https://issues.apache.org/jira/browse/KAFKA-6642 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Ashish Surana >Priority: Major > > We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware > replica > assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]). > This request is to have a similar feature for kafka streams applications. > Standby tasks/standby replica assignment in kafka streams is currently not > rack aware, and this request is to make it rack aware for better availability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6642) Rack aware replica assignment in kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778 ] Ashish Surana edited comment on KAFKA-6642 at 3/28/18 4:12 AM: --- Current task assignor is sticky, and it can be made rack-aware. Where we ensure that same tasks (active & replicas) are assigned different racks as much as possible. Approach # RACK_ID is added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same rack_id are considered in the same rack. # No changes in input topic partition to task assignment Assignment of tasks to stream instances: # We assign active tasks to the instances which were having same task as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances which were having same task as standby previously. # Active tasks which still couldn't be assigned to instances in round-robin starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one unique active task so no extra rack aware logic is required in this step. # Now we have to assign standy-tasks, and here we assign standby to instances in different rack then it's active task or other standy-tasks are running. If we run out of racks then we can assign standby-tasks in same rack but different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. Scenario#1 When no RACK_ID is not passed in any of the stream instances. In this case, assignment will happen as it's happening currently by StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are considered to be part of single default-rack. Scenario#2 When RACK_ID is passed in all the stream instances. In this case, all instances belong to one or the other rack, and assignment is rack-aware as per above approach. Scenario#3 When RACK_ID is passed in some stream instances but not in all. In this case, all the instances with RACK_ID will belong to the provided racks. All the instances for whom RACK_ID were not passed, will be considered to be part of single default-rack. Please let us know what you guys think about approach. was (Author: asurana): Current task assignor is sticky, and it can be made rack-aware. Where we ensure that same tasks (active & replicas) are assigned different racks as much as possible. Approach # RACK_ID is added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same rack_id are considered in the same rack. # No changes in input topic partition to task assignment Assignment of tasks to stream instances: # We assign active tasks to the instances which were having same task as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances which were having same task as standby previously. # Active tasks which still couldn't be assigned to instances in round-robin starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one unique active task so no extra rack aware logic is required in this step. # Now we have to assign standy-tasks, and here we assign standby to instances in different rack then it's active task or other standy-tasks are running. If we run out of racks then we can assign standby-tasks in same rack but different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. > Rack aware replica assignment in kafka streams > -- > > Key: KAFKA-6642 > URL: https://issues.apache.org/jira/browse/KAFKA-6642 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Ashish Surana >Priority: Major > > We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware > replic
[jira] [Comment Edited] (KAFKA-6642) Rack aware replica assignment in kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778 ] Ashish Surana edited comment on KAFKA-6642 at 3/28/18 4:13 AM: --- Current task assignor is sticky, and it can be made rack-aware. Where we ensure that same tasks (active & replicas) are assigned different racks as much as possible. Approach # RACK_ID is added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same rack_id are considered in the same rack. # No changes in input topic partition to task assignment Assignment of tasks to stream instances: # We assign active tasks to the instances which were having same task as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances which were having same task as standby previously. # Active tasks which still couldn't be assigned to instances in round-robin starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one unique active task so no extra rack aware logic is required in this step. # Now we have to assign standy-tasks, and here we assign standby to instances in different rack then it's active task or other standy-tasks are running. If we run out of racks then we can assign standby-tasks in same rack but different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. Scenario#1 When RACK_ID is not passed in any of the stream instances. In this case, assignment will happen as it's happening currently by StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are considered to be part of single default-rack. Scenario#2 When RACK_ID is passed in all the stream instances. In this case, all instances belong to one or the other rack, and assignment is rack-aware as per above approach. Scenario#3 When RACK_ID is passed in some stream instances but not in all. In this case, all the instances with RACK_ID will belong to the provided racks. All the instances for whom RACK_ID were not passed, will be considered to be part of single default-rack. Please let us know what you guys think about approach. was (Author: asurana): Current task assignor is sticky, and it can be made rack-aware. Where we ensure that same tasks (active & replicas) are assigned different racks as much as possible. Approach # RACK_ID is added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same rack_id are considered in the same rack. # No changes in input topic partition to task assignment Assignment of tasks to stream instances: # We assign active tasks to the instances which were having same task as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances which were having same task as standby previously. # Active tasks which still couldn't be assigned to instances in round-robin starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one unique active task so no extra rack aware logic is required in this step. # Now we have to assign standy-tasks, and here we assign standby to instances in different rack then it's active task or other standy-tasks are running. If we run out of racks then we can assign standby-tasks in same rack but different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. Scenario#1 When no RACK_ID is not passed in any of the stream instances. In this case, assignment will happen as it's happening currently by StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are considered to be part of single default-rack. Scenario#2 When RACK_ID is passed in all the stream instances. In this case, all instances belong to one or the other rack, and assignment is rack-aware as per above approach
[jira] [Comment Edited] (KAFKA-6642) Rack aware replica assignment in kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778 ] Ashish Surana edited comment on KAFKA-6642 at 3/28/18 4:18 AM: --- Current task assignor is sticky, and it can be made rack-aware with few changes. Where we ensure that same tasks (active & replicas) are assigned on different racks as much as possible. Approach # RACK_ID can be added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same RACK_ID are considered in the same rack. # No changes in partition to task assignment Assignment of tasks to instances: # We assign active tasks to the instances where same task was running as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances where same task was running as standby previously # Active tasks that still couldn't be assigned, are assigned to instances in round-robin way starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one active task for any task_id so no extra rack aware logic is required in assigning active tasks. # Now we have to assign standy-task, and here we assign these to instances running in racks other than the one with its active task. If we run out of racks then we can assign standby-tasks in same rack but on different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. Scenario#1 When RACK_ID is not passed in any of the stream instances. In this case, assignment will happen as it's happening currently by StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are considered to be part of single default-rack. Scenario#2 When RACK_ID is passed in all the stream instances. In this case, all instances belong to one or the other rack, and assignment is rack-aware as per above approach. Scenario#3 When RACK_ID is passed in some stream instances but not in all. In this case, all the instances with RACK_ID will belong to the provided racks. All the instances for whom RACK_ID were not passed, will be considered to be part of single default-rack. Please let us know what you guys think about approach. was (Author: asurana): Current task assignor is sticky, and it can be made rack-aware. Where we ensure that same tasks (active & replicas) are assigned different racks as much as possible. Approach # RACK_ID is added in StreamsConfig file, and needs to be passed while starting kafka-streams application. All the processes having same rack_id are considered in the same rack. # No changes in input topic partition to task assignment Assignment of tasks to stream instances: # We assign active tasks to the instances which were having same task as active previously. # Active Tasks which couldn't be assigned in first step are assigned to the instances which were having same task as standby previously. # Active tasks which still couldn't be assigned to instances in round-robin starting from least-loaded instance # Above 3 steps are same as StickyAssignor as there is only one unique active task so no extra rack aware logic is required in this step. # Now we have to assign standy-tasks, and here we assign standby to instances in different rack then it's active task or other standy-tasks are running. If we run out of racks then we can assign standby-tasks in same rack but different instances. # This makes the assignment rack-aware but more of a best effort and doesn't guarantee anything. This is because we might not have capacity left in some racks or we might have more number of replicas than number of racks etc Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't change the logic drastically. For example, current assignor is only sticky for active tasks, and standby task assignment logic is not sticky as it doesn't look for where the task was assigned previously. Scenario#1 When RACK_ID is not passed in any of the stream instances. In this case, assignment will happen as it's happening currently by StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are considered to be part of single default-rack. Scenario#2 When RACK_ID is passed in all the stream instances. In this case, all instances belong to one or the other rack, and assignment is rack-awar
[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer
[ https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416806#comment-16416806 ] Matthias J. Sax commented on KAFKA-6723: Make sense to me. Seems to be related to KAFKA-6657 -- do you think KAFKA-6657 subsumes this ticket? One open question, regardless of KAFKA-6657 is, if the default values for `max.poll.records` should be different for both consumers. > Separate "max.poll.record" for restore consumer and common consumer > --- > > Key: KAFKA-6723 > URL: https://issues.apache.org/jira/browse/KAFKA-6723 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > > Currently, Kafka Streams use `max.poll.record` config for both restore > consumer and normal stream consumer. In reality, they are doing different > processing workloads, and in order to speed up the restore speed, restore > consumer is supposed to have a higher throughput by setting `max.poll.record` > higher. The change involved is trivial: > [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149] > However, this is still a public API change (introducing a new config name), > so we need a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6642) Rack aware task assignment in kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Surana updated KAFKA-6642: - Summary: Rack aware task assignment in kafka streams (was: Rack aware replica assignment in kafka streams) > Rack aware task assignment in kafka streams > --- > > Key: KAFKA-6642 > URL: https://issues.apache.org/jira/browse/KAFKA-6642 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Ashish Surana >Priority: Major > > We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware > replica > assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]). > This request is to have a similar feature for kafka streams applications. > Standby tasks/standby replica assignment in kafka streams is currently not > rack aware, and this request is to make it rack aware for better availability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer
[ https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416816#comment-16416816 ] Guozhang Wang commented on KAFKA-6723: -- What are the driving rationales for different default values for different consumer types? > Separate "max.poll.record" for restore consumer and common consumer > --- > > Key: KAFKA-6723 > URL: https://issues.apache.org/jira/browse/KAFKA-6723 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Minor > > Currently, Kafka Streams use `max.poll.record` config for both restore > consumer and normal stream consumer. In reality, they are doing different > processing workloads, and in order to speed up the restore speed, restore > consumer is supposed to have a higher throughput by setting `max.poll.record` > higher. The change involved is trivial: > [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149] > However, this is still a public API change (introducing a new config name), > so we need a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416847#comment-16416847 ] ASF GitHub Bot commented on KAFKA-6711: --- cemo closed pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-mem… URL: https://github.com/apache/kafka/pull/4782 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 56e6bed0850..bd4f67e9de1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -334,10 +335,33 @@ public void close(final Map offsets) throws IOException { @Override public void checkpoint(final Map offsets) { + +// Find non persistent store's topics +final Map storeToChangelogTopic = topology.storeToChangelogTopic(); +final Set globalNonPersistentStoresTopics = new HashSet<>(); +for (final StateStore store : topology.globalStateStores()) { +if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); +} +} + checkpointableOffsets.putAll(offsets); -if (!checkpointableOffsets.isEmpty()) { + +final Map filteredOffsets = new HashMap<>(); + +// Skip non persistent store +for (final Map.Entry topicPartitionOffset : checkpointableOffsets.entrySet()) { +final String topic = topicPartitionOffset.getKey().topic(); +if (globalNonPersistentStoresTopics.contains(topic)) { +log.debug("Skipping global store' topic {}", topic); +} else { +filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); +} +} + +if (!filteredOffsets.isEmpty()) { try { -checkpoint.write(checkpointableOffsets); +checkpoint.write(filteredOffsets); } catch (IOException e) { log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index df8d2010d24..c449ec5f527 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -488,6 +488,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws IOException { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } +@Test +public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { +stateManager.initialize(); +initializeConsumer(10, 1, t3); +stateManager.register(store3, stateRestoreCallback); +stateManager.close(Collections.emptyMap()); + +assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap())); +} + private Map readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index ae46b8dadaa..08945d5047a 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -95,7 +95,7 @@ public void close() { @Override public boolean persistent() { -return false; +return rocksdbStore; } @Override This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific co
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416859#comment-16416859 ] ASF GitHub Bot commented on KAFKA-6711: --- cemo opened a new pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-mem… URL: https://github.com/apache/kafka/pull/4782 This PR is addressing issues when persisting non persistent stores into checkpoint file. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416862#comment-16416862 ] Srinivas Dhruvakumar commented on KAFKA-: - [~huxi_2b] -tried the latest patch KAFKA-3978 still hitting this bug > OffsetOutOfRangeException: Replica Thread Stopped Resulting in > Underreplicated Partitions > - > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Srinivas Dhruvakumar >Priority: Critical > Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png > > > Hello All, > Currently we were seeing a few underreplicated partitions on our test cluster > which is used for Intergation testing. On debugging more we found the replica > thread was stopped due to an error > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 50 of partition since it is larger > than the high watermark -1 > Kindly find the attached screenshot. > !Screen Shot 2018-03-15 at 3.52.13 PM.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416912#comment-16416912 ] Srinivas Dhruvakumar commented on KAFKA-6649: - [~hachikuji] Still hitting the bug after testing with the patch. > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)