[jira] [Comment Edited] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-27 Thread JIRA
[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414673#comment-16414673
 ] 

Cemalettin Koç edited comment on KAFKA-6711 at 3/27/18 7:13 AM:


[~guozhang] Would you please check implementation please: 

https://github.com/cemo/kafka/commits/b-kafka-6711

I have done something preliminary. I will add necessary tests as well after 
your guidance.


was (Author: cemo):
[~guozhang] Would you please check implementation please: 

[https://github.com/cemo/kafka/commit/0cb2482259fec897f396e8b84ffb1921c4f3f63e]

I have done something preliminary. I will add necessary tests as well after 
your guidance.

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6717) TopicPartition Assined twice to a consumer group for different consumers

2018-03-27 Thread Yuancheng PENG (JIRA)
Yuancheng PENG created KAFKA-6717:
-

 Summary: TopicPartition Assined twice to a consumer group for 
different consumers 
 Key: KAFKA-6717
 URL: https://issues.apache.org/jira/browse/KAFKA-6717
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Yuancheng PENG


I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain 
pattern.

There are 10 consumers with the same group id.

I expected that topic-partition to be assigned to only one consumer instance. 
However some topic partitions are assigned twice in 2 different difference 
instance, hence the consumer group process duplicate messages.
{code:java}
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
Collections.singletonList(StickyAssignor.class));

KafkaConsumer c = new KafkaConsumer<>(props);
c.subscribe(Pattern.compile(TOPIC_PATTERN), new 
NoOpConsumerRebalanceListener());
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances

2018-03-27 Thread Yuancheng PENG (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuancheng PENG updated KAFKA-6717:
--
Summary: TopicPartition Assined twice to a consumer group for 2 consumer 
instances   (was: TopicPartition Assined twice to a consumer group for 
different consumers )

> TopicPartition Assined twice to a consumer group for 2 consumer instances 
> --
>
> Key: KAFKA-6717
> URL: https://issues.apache.org/jira/browse/KAFKA-6717
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Yuancheng PENG
>Priority: Major
>
> I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain 
> pattern.
> There are 10 consumers with the same group id.
> I expected that topic-partition to be assigned to only one consumer instance. 
> However some topic partitions are assigned twice in 2 different difference 
> instance, hence the consumer group process duplicate messages.
> {code:java}
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> Collections.singletonList(StickyAssignor.class));
> KafkaConsumer c = new KafkaConsumer<>(props);
> c.subscribe(Pattern.compile(TOPIC_PATTERN), new 
> NoOpConsumerRebalanceListener());
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Deepak Goyal (JIRA)
Deepak Goyal created KAFKA-6718:
---

 Summary: Rack Aware Replica Task Assignment for Kafka Streams
 Key: KAFKA-6718
 URL: https://issues.apache.org/jira/browse/KAFKA-6718
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 1.1.0
Reporter: Deepak Goyal


|This features enables replica tasks to be assigned on different racks.
Replication factor = x
Number of Replica tasks = x
totalTasks = x+1 (replica + active) # If there are no racks provided: Cluster 
will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Deepak Goyal (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deepak Goyal updated KAFKA-6718:

Description: 
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
 
This feature is already implemented at Kafka but we needed similar for task 
assignments at Kafka Streams Application layer. 
 
This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
Post that it also helps to maintain stickyness with-in the rack.|

  was:
|This features enables replica tasks to be assigned on different racks.
Replication factor = x
Number of Replica tasks = x
totalTasks = x+1 (replica + active) # If there are no racks provided: Cluster 
will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|


> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Priority: Major
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>  
> This feature is already implemented at Kafka but we needed similar for task 
> assignments at Kafka Streams Application layer. 
>  
> This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
> NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks < number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
> Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Deepak Goyal (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deepak Goyal updated KAFKA-6718:

Description: 
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
  
 This feature is already implemented at Kafka 
[KIP-36\|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
 but we needed similar for task assignments at Kafka Streams Application layer. 
  
 This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
 NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
 Post that it also helps to maintain stickyness with-in the rack.|

  was:
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
 
This feature is already implemented at Kafka but we needed similar for task 
assignments at Kafka Streams Application layer. 
 
This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
Post that it also helps to maintain stickyness with-in the rack.|


> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Priority: Major
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36\|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks < number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Deepak Goyal (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deepak Goyal updated KAFKA-6718:

Description: 
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
  
 This feature is already implemented at Kafka 
[KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
 but we needed similar for task assignments at Kafka Streams Application layer. 
  
 This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
 NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
 Post that it also helps to maintain stickyness with-in the rack.|

  was:
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
  
 This feature is already implemented at Kafka 
[KIP-36\|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
 but we needed similar for task assignments at Kafka Streams Application layer. 
  
 This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
 NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
 Post that it also helps to maintain stickyness with-in the rack.|


> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Priority: Major
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks < number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6698) ConsumerBounceTest#testClose sometimes fails

2018-03-27 Thread Ted Yu (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407577#comment-16407577
 ] 

Ted Yu edited comment on KAFKA-6698 at 3/27/18 3:17 PM:


Test output consisted of repeated occurrence of:

{code}
[2018-03-21 07:02:01,683] ERROR ZKShutdownHandler is not registered, so 
ZooKeeper server won't take any action on ERROR or SHUTDOWN server state 
changes (org.apache.zookeeper.server.ZooKeeperServer:472)
[2018-03-21 07:02:02,693] WARN Session 0x0 for server null, unexpected error, 
closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn:1162)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
[2018-03-21 07:02:03,794] WARN Session 0x0 for server null, unexpected error, 
closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn:1162)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
{code}


was (Author: yuzhih...@gmail.com):
Test output consisted of repeated occurrence of:
{code}
[2018-03-21 07:02:01,683] ERROR ZKShutdownHandler is not registered, so 
ZooKeeper server won't take any action on ERROR or SHUTDOWN server state 
changes (org.apache.zookeeper.server.ZooKeeperServer:472)
[2018-03-21 07:02:02,693] WARN Session 0x0 for server null, unexpected error, 
closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn:1162)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
[2018-03-21 07:02:03,794] WARN Session 0x0 for server null, unexpected error, 
closing socket connection and attempting reconnect 
(org.apache.zookeeper.ClientCnxn:1162)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
{code}

> ConsumerBounceTest#testClose sometimes fails
> 
>
> Key: KAFKA-6698
> URL: https://issues.apache.org/jira/browse/KAFKA-6698
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> Saw the following in 
> https://builds.apache.org/job/kafka-1.1-jdk7/94/testReport/junit/kafka.api/ConsumerBounceTest/testClose/
>  :
> {code}
> org.apache.kafka.common.errors.TimeoutException: The consumer group command 
> timed out while waiting for group to initialize: 
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-03-27 Thread Jason Gustafson (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6446.

   Resolution: Fixed
Fix Version/s: 1.2.0

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Assignee: huxihx
>Priority: Critical
> Fix For: 1.2.0
>
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415868#comment-16415868
 ] 

ASF GitHub Bot commented on KAFKA-6446:
---

hachikuji closed pull request #4563: KAFKA-6446: KafkaProducer should use timed 
version of `await` to avoid endless waiting
URL: https://github.com/apache/kafka/pull/4563
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5fc9a1b9b38..a5af5b60093 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -256,6 +256,7 @@
 private final ProducerInterceptors interceptors;
 private final ApiVersions apiVersions;
 private final TransactionManager transactionManager;
+private TransactionalRequestResult initTransactionsResult;
 
 /**
  * A producer is instantiated by providing a set of key-value pairs as 
configuration. Valid configuration strings
@@ -555,18 +556,36 @@ private static int parseAcks(String acksString) {
  *   2. Gets the internal producer id and epoch, used in all future 
transactional
  *  messages issued by the producer.
  *
+ * Note that this method will raise {@link TimeoutException} if the 
transactional state cannot
+ * be initialized before expiration of {@code max.block.ms}. Additionally, 
it will raise {@link InterruptException}
+ * if interrupted. It is safe to retry in either case, but once the 
transactional state has been successfully
+ * initialized, this method should no longer be used.
+ *
  * @throws IllegalStateException if no transactional.id has been configured
  * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
  * does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
  * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
  * transactional.id is not authorized. See the exception for more 
details
  * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+ * @throws TimeoutException if the time taken for initialize the 
transaction has surpassed max.block.ms.
+ * @throws InterruptException if the thread is interrupted while blocked
  */
 public void initTransactions() {
 throwIfNoTransactionManager();
-TransactionalRequestResult result = 
transactionManager.initializeTransactions();
-sender.wakeup();
-result.await();
+if (initTransactionsResult == null) {
+initTransactionsResult = 
transactionManager.initializeTransactions();
+sender.wakeup();
+}
+
+try {
+if (initTransactionsResult.await(maxBlockTimeMs, 
TimeUnit.MILLISECONDS)) {
+initTransactionsResult = null;
+} else {
+throw new TimeoutException("Timeout expired while initializing 
transactional state in " + maxBlockTimeMs + "ms.");
+}
+} catch (InterruptedException e) {
+throw new InterruptException("Initialize transactions 
interrupted.", e);
+}
 }
 
 /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7eea4992b33..426b273b885 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) {
 return false;
 
 AbstractRequest.Builder requestBuilder = 
nextRequestHandler.requestBuilder();
-while (true) {
+while (running) {
 Node targetNode = null;
 try {
 if (nextRequestHandler.needsCoordinator()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index ff93da872dc..9c02e94c045 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -59,7 +59,10 @@ public void await() {
 }
 

[jira] [Updated] (KAFKA-5540) Deprecate and remove internal converter configs

2018-03-27 Thread Chris Egerton (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-5540:
-
Fix Version/s: 1.2.0

> Deprecate and remove internal converter configs
> ---
>
> Key: KAFKA-5540
> URL: https://issues.apache.org/jira/browse/KAFKA-5540
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> The internal.key.converter and internal.value.converter were original exposed 
> as configs because a) they are actually pluggable and b) providing a default 
> would require relying on the JsonConverter always being available, which 
> until we had classloader isolation it was possible might be removed for 
> compatibility reasons.
> However, this has ultimately just caused a lot more trouble and confusion 
> than it is worth. We should deprecate the configs, give them a default of 
> JsonConverter (which is also kind of nice since it results in human-readable 
> data in the internal topics), and then ultimately remove them in the next 
> major version.
> These are all public APIs so this will need a small KIP before we can make 
> the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5540) Deprecate and remove internal converter configs

2018-03-27 Thread Chris Egerton (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-5540:


Assignee: Chris Egerton

> Deprecate and remove internal converter configs
> ---
>
> Key: KAFKA-5540
> URL: https://issues.apache.org/jira/browse/KAFKA-5540
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> The internal.key.converter and internal.value.converter were original exposed 
> as configs because a) they are actually pluggable and b) providing a default 
> would require relying on the JsonConverter always being available, which 
> until we had classloader isolation it was possible might be removed for 
> compatibility reasons.
> However, this has ultimately just caused a lot more trouble and confusion 
> than it is worth. We should deprecate the configs, give them a default of 
> JsonConverter (which is also kind of nice since it results in human-readable 
> data in the internal topics), and then ultimately remove them in the next 
> major version.
> These are all public APIs so this will need a small KIP before we can make 
> the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Chris Egerton (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415952#comment-16415952
 ] 

Chris Egerton commented on KAFKA-6417:
--

Just detecting a JAR file on a {{plugin.path}} directory isn't sufficient cause 
for alarm, since uber JARs are supported for plugins. An alternative could be 
to log a warning if a JAR file is detected in a {{plugin.path}} directory that 
doesn't contain any plugins. The warning could read something like "Archive 
file  in plugin path directory  does not 
contain any recognizable plugins and will not be used, even as a dependency for 
other plugins in the same directory."

Not completely in love with that wording/criteria; in the event that your 
{{plugin.path}} looks like {{/plugin/path,plugin/path/plugin1}}, non-plugin 
JARs found in the {{plugin/path/plugin1}} directory will then be incorrectly 
flagged even though they can be used as dependencies for {{plugin1}} since it 
is correctly formatted for use as a plugin in the {{/plugin/path}} directory.

[~cotedm], thoughts?

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Guozhang Wang (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6718:


Assignee: Deepak Goyal

> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks < number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Guozhang Wang (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415967#comment-16415967
 ] 

Guozhang Wang commented on KAFKA-6718:
--

[~_deepakgoyal] thanks for creating the KIP! I've assigned the JIRA to you.

About the KIP itself, please note that if you are enhancing the rebalance 
protocol to encode the rack id information, these two KIPs are correlated and 
I'd recommend you read about them first:

Any protocol changes will need to consider a smooth upgrade path:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade

We'd like to encode some other information into the metadata to enhance 
partition assignor's workload balance awareness:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-262%3A+Metadata+should+include+number+of+state+stores+for+task

> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks < number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6127) Streams should never block infinitely

2018-03-27 Thread Matthias J. Sax (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6127:
---
Description: 
Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
{{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
(fixed in KAFKA-6446) and we should double check the code if we handle this 
case correctly.

If we block within one operation, the whole {{StreamThread}} would block, and 
the instance does not make any progress, becomes unresponsive (for example, 
{{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
group.

We might consider to use {{wakeup()}} calls to unblock those operations to keep 
{{StreamThread}} in a responsive state.

Note: there are discussion to add timeout to those calls, and thus, we could 
get {{TimeoutExceptions}}. This would be easier to handle than using 
{{wakeup()}}. Thus, we should keep an eye on those discussions. 

  was:
Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
{{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block.

If we block within one operation, the whole {{StreamThread}} would block, and 
the instance does not make any progress, becomes unresponsive (for example, 
{{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
group.

We might consider to use {{wakeup()}} calls to unblock those operations to keep 
{{StreamThread}} in a responsive state.

Note: there are discussion to add timeout to those calls, and thus, we could 
get {{TimeoutExceptions}}. This would be easier to handle than using 
{{wakeup()}}. Thus, we should keep an eye on those discussions. 


> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> We might consider to use {{wakeup()}} calls to unblock those operations to 
> keep {{StreamThread}} in a responsive state.
> Note: there are discussion to add timeout to those calls, and thus, we could 
> get {{TimeoutExceptions}}. This would be easier to handle than using 
> {{wakeup()}}. Thus, we should keep an eye on those discussions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-03-27 Thread Matthias J. Sax (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6446:
---
Labels: exactly-once  (was: )

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Assignee: huxihx
>Priority: Critical
>  Labels: exactly-once
> Fix For: 1.2.0
>
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6127) Streams should never block infinitely

2018-03-27 Thread Matthias J. Sax (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6127:
---
Labels: exactly-once  (was: )

> Streams should never block infinitely
> -
>
> Key: KAFKA-6127
> URL: https://issues.apache.org/jira/browse/KAFKA-6127
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: exactly-once
>
> Streams uses three consumer APIs that can block infinite: {{commitSync()}}, 
> {{committed()}}, and {{position()}}. Also {{KafkaProducer#send()}} can block. 
> If EOS is enabled, {{KafkaProducer#initTransactions()}} also used to block 
> (fixed in KAFKA-6446) and we should double check the code if we handle this 
> case correctly.
> If we block within one operation, the whole {{StreamThread}} would block, and 
> the instance does not make any progress, becomes unresponsive (for example, 
> {{KafkaStreams#close()}} suffers), and we also might drop out of the consumer 
> group.
> We might consider to use {{wakeup()}} calls to unblock those operations to 
> keep {{StreamThread}} in a responsive state.
> Note: there are discussion to add timeout to those calls, and thus, we could 
> get {{TimeoutExceptions}}. This would be easier to handle than using 
> {{wakeup()}}. Thus, we should keep an eye on those discussions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-27 Thread Matthias J. Sax (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6718:
---
Labels: needs-kip  (was: )

> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
>  Labels: needs-kip
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks < number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6719) Kafka Reassign Partitions Failure

2018-03-27 Thread Srinivas Dhruvakumar (JIRA)
Srinivas Dhruvakumar created KAFKA-6719:
---

 Summary: Kafka Reassign Partitions Failure 
 Key: KAFKA-6719
 URL: https://issues.apache.org/jira/browse/KAFKA-6719
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.2
Reporter: Srinivas Dhruvakumar
 Attachments: Screen Shot 2018-03-27 at 10.27.29 AM.png

The Kafka reassign partition fails with the following error 

!Screen Shot 2018-03-27 at 10.27.29 AM.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6719) Kafka Reassign Partitions Failure

2018-03-27 Thread Srinivas Dhruvakumar (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srinivas Dhruvakumar resolved KAFKA-6719.
-
Resolution: Invalid

> Kafka Reassign Partitions Failure 
> --
>
> Key: KAFKA-6719
> URL: https://issues.apache.org/jira/browse/KAFKA-6719
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.2
>Reporter: Srinivas Dhruvakumar
>Priority: Major
> Attachments: Screen Shot 2018-03-27 at 10.27.29 AM.png
>
>
> The Kafka reassign partition fails with the following error 
> !Screen Shot 2018-03-27 at 10.27.29 AM.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Dustin Cote (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416143#comment-16416143
 ] 

Dustin Cote commented on KAFKA-6417:


[~ChrisEgerton] I think the problem with this is that in practice you could 
have so many jars that also match this criteria that the log becomes chatty and 
the message gets missed (this happens a lot with the ProducerConfig and 
ConsumerConfig warnings that pop out). There would probably be a lot of false 
positives here. 

One thought I have is to enforce an easy to understand policy that no jars are 
allowed at all directly under {{plugin.path}} and just crash the worker saying 
so to "fail fast" in the event someone isn't following convention. That would 
require uber jar users to make a top level directory to contain their jar which 
may be a little annoying but would be easy to understand. This doesn't seem 
like a severe penalty in general considering the real usage pattern but would 
absolutely have compatibility issues with what exists out there today (on 
upgrade you may see trouble). Alternatively, failing an individual connector 
might work too, but I don't know how that would go in practice.

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416164#comment-16416164
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

cemo opened a new pull request #4782: KAFKA-6711: GlobalStateManagerImpl should 
not write offsets of in-mem…
URL: https://github.com/apache/kafka/pull/4782
 
 
   …ory stores in checkpoint file
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-27 Thread JIRA
[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416168#comment-16416168
 ] 

Cemalettin Koç commented on KAFKA-6711:
---

Sorry to open quickly issue :( Bot polluted issue. 

Here is the my quick shot to issue. Please not that I am pretty much to new 
Kafka land. Review twice :)

https://github.com/apache/kafka/pull/4782

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Chris Egerton (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416174#comment-16416174
 ] 

Chris Egerton commented on KAFKA-6417:
--

There's also the case where a plugin performs some operations either statically 
or, in the case of a connector, in its no-args constructor, in which case the 
resulting {{ClassNotFoundException}} is thrown while starting up connect (in 
either distributed or standalone mode), and causes the framework to crash.

If this happens, there's no guarantee that any of the non-plugin-containing 
JARs would be scanned before the plugin JAR itself is scanned and the 
framework-halting exception is thrown, so we can't rely on the user having seen 
any warnings beforehand about non-plugin-containing JARs not being used as 
dependencies for other plugins.

Handling this separate case could involve intercepting the 
{{ClassNotFoundException}} and outputting a similar warning message about 
improper plugin path structure, before throwing the same exception and causing 
the framework to halt like before.

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Chris Egerton (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416174#comment-16416174
 ] 

Chris Egerton edited comment on KAFKA-6417 at 3/27/18 8:18 PM:
---

Sorry, started working on this comment before I saw yours, [~cotedm]; I'll 
address your points now in a separate one.

 

There's also the case where a plugin performs some operations either statically 
or, in the case of a connector, in its no-args constructor, in which case the 
resulting {{ClassNotFoundException}} is thrown while starting up connect (in 
either distributed or standalone mode), and causes the framework to crash.

If this happens, there's no guarantee that any of the non-plugin-containing 
JARs would be scanned before the plugin JAR itself is scanned and the 
framework-halting exception is thrown, so we can't rely on the user having seen 
any warnings beforehand about non-plugin-containing JARs not being used as 
dependencies for other plugins.

Handling this separate case could involve intercepting the 
{{ClassNotFoundException}} and outputting a similar warning message about 
improper plugin path structure, before throwing the same exception and causing 
the framework to halt like before.


was (Author: chrisegerton):
There's also the case where a plugin performs some operations either statically 
or, in the case of a connector, in its no-args constructor, in which case the 
resulting {{ClassNotFoundException}} is thrown while starting up connect (in 
either distributed or standalone mode), and causes the framework to crash.

If this happens, there's no guarantee that any of the non-plugin-containing 
JARs would be scanned before the plugin JAR itself is scanned and the 
framework-halting exception is thrown, so we can't rely on the user having seen 
any warnings beforehand about non-plugin-containing JARs not being used as 
dependencies for other plugins.

Handling this separate case could involve intercepting the 
{{ClassNotFoundException}} and outputting a similar warning message about 
improper plugin path structure, before throwing the same exception and causing 
the framework to halt like before.

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Chris Egerton (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416201#comment-16416201
 ] 

Chris Egerton commented on KAFKA-6417:
--

Curious--why do you think false positives would be a big issue? Do most users 
(that you know of and/or that have encountered this issue) have nested 
directories included in their {{plugin.path}} (e.g., 
{{plugin.path=foo/bar,foo/bar/baz}}) and/or a bunch of unnecessary JARs lying 
around?

If we make it clear that we're only warning that the JAR we've encountered 
won't be used as a dependency for plugin JARs in the same directory, the 
warning can be safely ignored by anyone reading it who isn't encountering an 
issue instantiating a given plugin but will be useful for anyone who is, as 
long as they don't have nested directories in their {{plugin.path}}.

 

To be clear, your suggestion for altering the plugin scanning behavior is 
essentially that we no longer differentiate uber JARs from other JARs and place 
all JAR files for a plugin in a directory underneath a directory supplied as a 
plugin path, correct? Can ask around to see what people think about that before 
writing up a KIP but it seems like a bit of a nuclear option given the 
compatibility issues you mentioned.

 

RE: failing individual plugins: Have to dig into the code base more to verify 
this, but that could be as simple as adding another {{catch}} block for the 
{{ClassNotFoundException}} that gets thrown under these circumstances and 
outputting an appropriate error message about not being able to use the 
connector, similar to [what is already 
done|https://github.com/apache/kafka/blob/395c7e0f0985b424ea2bc2bd40c0237eada24dcf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L183-L189]
 in the plugin scanning phase for Connect. Thoughts?

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Dustin Cote (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416232#comment-16416232
 ] 

Dustin Cote commented on KAFKA-6417:


[~ChrisEgerton] yeah it's a bit extreme probably to kill the whole worker, so 
just failing the non-conforming connector is better IMO. However, the 
convention today is pretty convoluted and I think the pain of breaking 
compatibility (for some major release) isn't so bad.

We do have lots of users with polluted classpaths, especially coming from a 
hadoop world where they just have hundreds of jars on their system that they 
don't really manage themselves. The original report of this actually came from 
that type of situation and once you're there it's hard to figure out what's 
going on. Adding more stuff in the log might help if you know what you are 
looking for a priori but coming at it from just a ClassNotFoundException, you 
have hundreds of jars to sift through the point gets lost a bit. A user may be 
thinking "if I'm missing a class why do I have this repeated message that these 
JARs won't be used as a dependency?" This is a more complex problem for a user 
to solve than, "my worker said it went down because I have JARs where they 
shouldn't be". The corrective action for the former is pretty unclear, but the 
latter I think the action is clear and if a class isn't found in that case you 
know what directory to look in.

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416246#comment-16416246
 ] 

ASF GitHub Bot commented on KAFKA-6473:
---

mjsax closed pull request #4736: KAFKA-6473: Add MockProcessorContext to public 
test-utils
URL: https://github.com/apache/kafka/pull/4736
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 8de03efeba0..d5fd7d5fd7b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1002,6 +1002,10 @@ project(':streams:examples') {
 compile project(':streams')
 compile project(':connect:json')  // this dependency should be removed 
after we unify data API
 compile libs.slf4jlog4j
+
+testCompile project(':clients').sourceSets.test.output // for 
org.apache.kafka.test.IntegrationTest
+testCompile project(':streams:test-utils')
+testCompile libs.junit
   }
 
   javadoc {
diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 34ac89ffe05..8552bcc8674 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -66,6 +66,7 @@
 
 
 Writing streams back to 
Kafka
+Testing a Streams application
 
 
 
@@ -3154,6 +3155,10 @@ Overview
+Testing a Streams application
+Kafka Streams comes with a test-utils module to help 
you test your application here.
+
 
 
 
diff --git a/docs/streams/developer-guide/processor-api.html 
b/docs/streams/developer-guide/processor-api.html
index b51bc22cfe2..e3432b79b7c 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -41,13 +41,16 @@
 Table of Contents
 
 Overview
-Defining a Stream 
Processor
-State Stores
-Defining and creating a 
State Store
-Fault-tolerant State 
Stores
-Enable or Disable Fault Tolerance of State Stores (Store 
Changelogs)
-Implementing Custom State 
Stores
-
+Defining a Stream
+Processor
+Unit Testing Processors
+State Stores
+
+Defining and creating a 
State Store
+Fault-tolerant State 
Stores
+Enable or Disable Fault Tolerance of State Stores (Store 
Changelogs)
+Implementing Custom State 
Stores
+
 
 Connecting Processors 
and State Stores
 
@@ -98,11 +101,12 @@ OverviewPunctuationType types within the same processor by 
calling ProcessorContext#schedule() multiple
 times inside init() method.
 
-Attention
+Attention
 Stream-time is only advanced if all input 
partitions over all input topics have new data (with newer timestamps) 
available.
 If at least one partition does not have any new data 
available, stream-time will not be advanced and thus punctuate() will not be triggered if 
PunctuationType.STREAM_TIME was specified.
 This behavior is independent of the configured timestamp 
extractor, i.e., using WallclockTimestampExtractor does not enable 
wall-clock triggering of punctuate().
 
+Example
 The following example Processor defines a simple word-count algorithm and 
the following actions are performed:
 
 In the init() method, schedule the punctuation every 1000 
time units (the time unit is normally milliseconds, which in this example would 
translate to punctuation every 1 second) and retrieve the local state store by 
its name “Counts”.
@@ -159,6 +163,16 @@ Overviewstate stores documentation.
 
 
+
+
+Unit Testing Processors
+
+
+
+Kafka Streams comes with a test-utils module to 
help you write unit tests for your
+processors here.
+
+
 
 State Stores
 To implement a stateful Processor or Transformer, you must provide one or 
more state stores to the processor
diff --git a/docs/streams/developer-guide/testing.html 
b/docs/streams/developer-guide/testing.html
index e6886a1689f..ea2ae987c7e 100644
--- a/docs/streams/developer-guide/testing.html
+++ b/docs/streams/developer-guide/testing.

[jira] [Resolved] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-03-27 Thread Matthias J. Sax (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6473.

   Resolution: Fixed
Fix Version/s: 1.2.0

> Add MockProcessorContext to public test-utils
> -
>
> Key: KAFKA-6473
> URL: https://issues.apache.org/jira/browse/KAFKA-6473
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
> Fix For: 1.2.0
>
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
> class. Using the test driver for a single 
> Processor/Transformer/ValueTransformer it's required to specify a whole 
> topology with source and sink and plus the 
> Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, 
> that can be used to test the Processor/Transformer/ValueTransformer in 
> isolation. Ie, the test itself creates new 
> Processor/Transformer/ValueTransformer object and calls init() manually 
> passing in the MockProcessorContext.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-27 Thread Vikas Tikoo (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416281#comment-16416281
 ] 

Vikas Tikoo commented on KAFKA-5882:


We have encountered this issue 19 times in the last 15 days, and always around 
redeploys. Running kafka-streams v0.11.0.0.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6720) Inconsistent Kafka Streams behaviour when topic does not exist

2018-03-27 Thread Daniel Wojda (JIRA)
Daniel Wojda created KAFKA-6720:
---

 Summary: Inconsistent Kafka Streams behaviour when topic does not 
exist
 Key: KAFKA-6720
 URL: https://issues.apache.org/jira/browse/KAFKA-6720
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.1
Reporter: Daniel Wojda


When Kafka Streams starts it reads metadata about topics used in topology
 and it's partitions. If topology of that stream contains stateful operation 
like #join, and a topic does not exist 
[TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719]
 will be thrown.

In case of streams with simple topology with stateless operations only, like 
#mapValue, and topic does not exist, Kafka Streams does not throw any 
exception, just logs a warning:
 ["log.warn("No partitions found for topic {}", 
topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435]
 

I believe the behaviour of Kafka Streams in both cases should be the same, and 
it should throw TopologyBuilderException.

I am more than happy to prepare a Pull Request if it is a valid issue.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6720) Inconsistent Kafka Streams behaviour when topic does not exist

2018-03-27 Thread Mariam John (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mariam John resolved KAFKA-6720.

Resolution: Duplicate

This is similar to KAFKA-6437.

> Inconsistent Kafka Streams behaviour when topic does not exist
> --
>
> Key: KAFKA-6720
> URL: https://issues.apache.org/jira/browse/KAFKA-6720
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Daniel Wojda
>Priority: Minor
>
> When Kafka Streams starts it reads metadata about topics used in topology
>  and it's partitions. If topology of that stream contains stateful operation 
> like #join, and a topic does not exist 
> [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719]
>  will be thrown.
> In case of streams with simple topology with stateless operations only, like 
> #mapValue, and topic does not exist, Kafka Streams does not throw any 
> exception, just logs a warning:
>  ["log.warn("No partitions found for topic {}", 
> topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435]
>  
> I believe the behaviour of Kafka Streams in both cases should be the same, 
> and it should throw TopologyBuilderException.
> I am more than happy to prepare a Pull Request if it is a valid issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-27 Thread Srinivas Dhruvakumar (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416382#comment-16416382
 ] 

Srinivas Dhruvakumar commented on KAFKA-6649:
-

I am trying out the patch "high watermark could be incorrectly set to -1". But 
I am unable to reproduce the above scenario "

: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment 
the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it 
is larger than the high watermark -1

"

Does anyone know how to reproduce the above error ? 

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-27 Thread Matthias J. Sax (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416425#comment-16416425
 ] 

Matthias J. Sax commented on KAFKA-5882:


Thanks for reporting this. Can you upgrade to 0.11.0.2 or even better 1.0.1 ? 
Also note, that 1.1 should be release shortly. 

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416428#comment-16416428
 ] 

ASF GitHub Bot commented on KAFKA-6716:
---

huxihx opened a new pull request #4783: KAFKA-6716: Should close the 
`discardChannel` in completeSend
URL: https://github.com/apache/kafka/pull/4783
 
 
   KAFKA-6716: Should close the `discardChannel` in completeSend
   https://issues.apache.org/jira/browse/KAFKA-6716
   
   Should close the `discardChannel` in MockSelector#completeSend
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> discardChannel should be released in MockSelector#completeSend
> --
>
> Key: KAFKA-6716
> URL: https://issues.apache.org/jira/browse/KAFKA-6716
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> private void completeSend(Send send) throws IOException {
> // Consume the send so that we will be able to send more requests to 
> the destination
> ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
> while (!send.completed()) {
> send.writeTo(discardChannel);
> }
> completedSends.add(send);
> }
> {code}
> The {{discardChannel}} should be closed before returning from the method



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend

2018-03-27 Thread huxihx (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-6716:
-

Assignee: huxihx

> discardChannel should be released in MockSelector#completeSend
> --
>
> Key: KAFKA-6716
> URL: https://issues.apache.org/jira/browse/KAFKA-6716
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>
> {code}
> private void completeSend(Send send) throws IOException {
> // Consume the send so that we will be able to send more requests to 
> the destination
> ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
> while (!send.completed()) {
> send.writeTo(discardChannel);
> }
> completedSends.add(send);
> }
> {code}
> The {{discardChannel}} should be closed before returning from the method



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6721) Consolidate state store management for global stores and normal stores

2018-03-27 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6721:


 Summary: Consolidate state store management for global stores and 
normal stores
 Key: KAFKA-6721
 URL: https://issues.apache.org/jira/browse/KAFKA-6721
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today the internal logic of handling global state store restoration and update, 
and normal store restoration are separated in two set of classes. Hence 
whenever we are updating the logic for one of it we need to do the same for 
others, which we may easily forget, causing regressions.

As a tech debt cleanup we should consider consolidating the logic of global 
state stores into `StateRestorer` and `StoreChangelogReader` if possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Chris Egerton (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416438#comment-16416438
 ] 

Chris Egerton commented on KAFKA-6417:
--

Alright, given that insight I agree that issuing warnings for unused JARs would 
just end up flooding logs with often-unnecessary messages.

To clarify, do you think it'd make more sense at this point to pursue changing 
the plugin structure or to catch the {{ClassNotFoundException}} caused by 
improper structure and add an error message about said improper structure 
before failing the connector? Open to either at this point, although still 
leaning away from changing plugin structure a small amount.

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6386) Deprecate KafkaStreams constructor taking StreamsConfig parameter

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416444#comment-16416444
 ] 

ASF GitHub Bot commented on KAFKA-6386:
---

guozhangwang closed pull request #4354: KAFKA-6386:use Properties instead of 
StreamsConfig in KafkaStreams constructor
URL: https://github.com/apache/kafka/pull/4354
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index baf9633a0c3..464854c57ad 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,7 +86,11 @@ Streams API
 to let users specify inner serdes if the default serde classes are 
windowed serdes.
 For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs";>KIP-265.
 /
-
+
+ 
+   We have deprecated StreamsConfig in 
KafkaStreams constructors. Now we only take in 
java.util.Properties since StreamsConfig is immutable 
and is created from a Properties object itself.
+For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor";>KIP-245.
+
 
   Kafka 1.2.0 allows to manipulate timestamps of output records using the 
Processor API (https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API";>KIP-251).
   To enable this new feature, ProcessorContext#forward(...) 
was modified.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 2ea5218d647..8a6ec05b4ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -26,7 +26,7 @@
 /**
  * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to 
a {@link KafkaStreams} instance.
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, 
StreamsConfig, KafkaClientSupplier)
+ * @see KafkaStreams#KafkaStreams(Topology, java.util.Properties, 
KafkaClientSupplier)
  */
 public interface KafkaClientSupplier {
 /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 1a70e4638ef..186276c22d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -515,31 +515,18 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 
 /**
- * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+ * Create a {@code KafkaStreams} instance.
+ * 
+ * Note: even if you never call {@link #start()} on a {@code KafkaStreams} 
instance,
+ * you still must {@link #close()} it to avoid resource leaks.
+ *
+ * @param topology the topology specifying the computational logic
+ * @param propsproperties for {@link StreamsConfig}
+ * @throws StreamsException if any fatal error occurs
  */
-@Deprecated
-public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder builder,
+public KafkaStreams(final Topology topology,
 final Properties props) {
-this(builder.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier());
-}
-
-/**
- * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
- */
-@Deprecated
-public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder builder,
-final StreamsConfig config) {
-this(builder.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier());
-}
-
-/**
- * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, 
KafkaClientSupplier)} instead
- */
-@Deprecated
-public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder builder,
-final StreamsConfig config,
-final KafkaClientSupplier clientSupplier) {
-this(builder.internalTopologyBuilder, config, clientSupplier);
+this(topology.internalTopologyBuilder, new StreamsConfig(props), new 
DefaultKafkaClientSupplier());
 }
 
 /**
@@ -548,13 +535,16 @@ public KafkaStreams(final 
org.apache.kafka.streams.processor.TopologyBuilder bui
  * Note: even if you never call {@link #start()} on a {@code KafkaStreams} 
instance,
  * you still mu

[jira] [Closed] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-03-27 Thread John Roesler (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-6473.
---

The PR for this work is merged: https://github.com/apache/kafka/pull/4736

> Add MockProcessorContext to public test-utils
> -
>
> Key: KAFKA-6473
> URL: https://issues.apache.org/jira/browse/KAFKA-6473
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
> Fix For: 1.2.0
>
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
> class. Using the test driver for a single 
> Processor/Transformer/ValueTransformer it's required to specify a whole 
> topology with source and sink and plus the 
> Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, 
> that can be used to test the Processor/Transformer/ValueTransformer in 
> isolation. Ie, the test itself creates new 
> Processor/Transformer/ValueTransformer object and calls init() manually 
> passing in the MockProcessorContext.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-27 Thread Guozhang Wang (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416494#comment-16416494
 ] 

Guozhang Wang commented on KAFKA-6437:
--

This is a interesting reported issue in KAFKA-6720 that for join-involved 
topics, if it does not exist yet a exception will be thrown. I think it is not 
a complete duplicate of this ticket, and I'd like to summarize the 
"inconsistent" behavior that we are facing today:

1) For join operation from user topics directly (i.e. no reshuffling added as 
Streams assumes input topics already partitioned by key), we'd require user 
topics pre-exist; and if not, we throw TopologyBuilderException.
2) For join operation from repartition topics, since they are note available at 
assignment phase we "assume" the repartition topics will be created and become 
available, hence we do not check if the source topics are available. When the 
source topic is missing, and hence no data will be send to the repartition 
topics at all, Streams will hang (this is what this JIRA reported).
3) For stateless operations, if a source topic was missing, Streams will 
continue but generate a warning.

So I think the actual fix should be in two folds:

1) We can [collect all external topic's 
num.partition|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L425-L437]
 at the very beginning of the assign() phase, and log a warning entry if some 
of the topic's metadata cannot be found.

2) In step one we do not need to [query the 
metadata|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L341]
 again but we can get directly from the collected available num.partitions map.

3) The finally in ensureCopartitioning, if the metadata cannot be found we skip 
the [checking co-partition 
phase|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L665]
 but log another warning that "since the topic is not found, we will skip the 
co-partition validation .."


> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic an

[jira] [Comment Edited] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Chris Egerton (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416438#comment-16416438
 ] 

Chris Egerton edited comment on KAFKA-6417 at 3/28/18 12:14 AM:


[~cotedm] Alright, given that insight I agree that issuing warnings for unused 
JARs would just end up flooding logs with often-unnecessary messages and a 
different approach is warranted.

To clarify, do you think it'd make more sense at this point to pursue changing 
the plugin structure or to catch the {{ClassNotFoundException}} caused by 
improper structure and add an error message about said improper structure 
before failing the connector? Open to either at this point, although still 
leaning away from changing plugin structure a small amount.


was (Author: chrisegerton):
Alright, given that insight I agree that issuing warnings for unused JARs would 
just end up flooding logs with often-unnecessary messages.

To clarify, do you think it'd make more sense at this point to pursue changing 
the plugin structure or to catch the {{ClassNotFoundException}} caused by 
improper structure and add an error message about said improper structure 
before failing the connector? Open to either at this point, although still 
leaning away from changing plugin structure a small amount.

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6722) SensorAccess.getOrCreate should be more efficient

2018-03-27 Thread wade wu (JIRA)
wade wu created KAFKA-6722:
--

 Summary: SensorAccess.getOrCreate should be more efficient
 Key: KAFKA-6722
 URL: https://issues.apache.org/jira/browse/KAFKA-6722
 Project: Kafka
  Issue Type: Improvement
Reporter: wade wu


The lock/unlock of read lock in getOrCreate() is not necessary, or it should be 
refactored. For each request from Producer, this read lock lock/unlock is 
called and lock/unlock, it is costing the time. It can be easily fixed using 
code below, and it is still thread safe: 

 

var sensor: Sensor = metrics.getSensor(sensorName)

if (sensor == null) {

lock.writeLock().lock()

try{





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6722) SensorAccess.getOrCreate should be more efficient

2018-03-27 Thread wade wu (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wade wu updated KAFKA-6722:
---
Description: 
The lock/unlock of read lock in getOrCreate() is not necessary, or it should be 
refactored. For each request from Producer, this read lock lock/unlock is 
called and lock/unlock, it is costing the time. 

The existing code is doing this in order to wait until the sensor 
initialization is finished, but this can be done when the sensor is created 
under the write lock, by having the thread sleep for a while (few 
milliseconds), and this time can be amortized, since sensor creating is a one 
time thing.

It can be easily fixed using code below, and it is still thread safe: 

 

var sensor: Sensor = metrics.getSensor(sensorName)

if (sensor == null) {

lock.writeLock().lock()

try{



  was:
The lock/unlock of read lock in getOrCreate() is not necessary, or it should be 
refactored. For each request from Producer, this read lock lock/unlock is 
called and lock/unlock, it is costing the time. It can be easily fixed using 
code below, and it is still thread safe: 

 

var sensor: Sensor = metrics.getSensor(sensorName)

if (sensor == null) {

lock.writeLock().lock()

try{




> SensorAccess.getOrCreate should be more efficient
> -
>
> Key: KAFKA-6722
> URL: https://issues.apache.org/jira/browse/KAFKA-6722
> Project: Kafka
>  Issue Type: Improvement
>Reporter: wade wu
>Priority: Major
>
> The lock/unlock of read lock in getOrCreate() is not necessary, or it should 
> be refactored. For each request from Producer, this read lock lock/unlock is 
> called and lock/unlock, it is costing the time. 
> The existing code is doing this in order to wait until the sensor 
> initialization is finished, but this can be done when the sensor is created 
> under the write lock, by having the thread sleep for a while (few 
> milliseconds), and this time can be amortized, since sensor creating is a one 
> time thing.
> It can be easily fixed using code below, and it is still thread safe: 
>  
> var sensor: Sensor = metrics.getSensor(sensorName)
> if (sensor == null) {
> lock.writeLock().lock()
> try{
> 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-27 Thread Dustin Cote (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416542#comment-16416542
 ] 

Dustin Cote commented on KAFKA-6417:


I think catching the exception and suggesting an action to correct the problem 
is probably more sensible in the immediate term and may be good enough to keep 
people from doing something silly. I just wasn't sure how easy that is to 
implement. If it's not risky (i.e. won't catch something else we should be 
worried about) then I'm all for it :)

Later on if people still struggle, then it's probably just too confusing and we 
can do something structurally easier to understand. Hope that helps!

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-27 Thread Srinivas Dhruvakumar (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416382#comment-16416382
 ] 

Srinivas Dhruvakumar edited comment on KAFKA-6649 at 3/28/18 2:31 AM:
--

I am trying out the patch "high watermark could be incorrectly set to -1" 
KAFKA-3978. But I am unable to reproduce the above scenario "

: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment 
the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it 
is larger than the high watermark -1

"

Does anyone know how to reproduce the above error ? 


was (Author: srinivas.d...@gmail.com):
I am trying out the patch "high watermark could be incorrectly set to -1". But 
I am unable to reproduce the above scenario "

: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment 
the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it 
is larger than the high watermark -1

"

Does anyone know how to reproduce the above error ? 

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-03-27 Thread Matthias J. Sax (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6473:
---
Labels: kip user-experience  (was: needs-kip user-experience)

> Add MockProcessorContext to public test-utils
> -
>
> Key: KAFKA-6473
> URL: https://issues.apache.org/jira/browse/KAFKA-6473
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip, user-experience
> Fix For: 1.2.0
>
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
> class. Using the test driver for a single 
> Processor/Transformer/ValueTransformer it's required to specify a whole 
> topology with source and sink and plus the 
> Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, 
> that can be used to test the Processor/Transformer/ValueTransformer in 
> isolation. Ie, the test itself creates new 
> Processor/Transformer/ValueTransformer object and calls init() manually 
> passing in the MockProcessorContext.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-27 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-6723:
--

 Summary: Separate "max.poll.record" for restore consumer and 
common consumer
 Key: KAFKA-6723
 URL: https://issues.apache.org/jira/browse/KAFKA-6723
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen


Currently, Kafka Streams use `max.poll.record` config for both restore consumer 
and normal stream consumer. In reality, they are doing different processing 
workloads, and in order to speed up the restore speed, restore consumer is 
supposed to have a higher throughput by setting `max.poll.record` higher. The 
change involved is trivial: 
[https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]

However, this is still a public API change (introducing a new config name), so 
we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-27 Thread Boyang Chen (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416749#comment-16416749
 ] 

Boyang Chen commented on KAFKA-6723:


[~guozhang] [~liquanpei] [~mjsax] Thoughts on this?

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6642) Rack aware replica assignment in kafka streams

2018-03-27 Thread Ashish Surana (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778
 ] 

Ashish Surana commented on KAFKA-6642:
--

Current task assignor is sticky, and it can be made rack-aware. Where we ensure 
that same tasks (active & replicas) are assigned different racks as much as 
possible.

Approach
 # RACK_ID is added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same rack_id are 
considered in the same rack.
 # No changes in input topic partition to task assignment

 

Assignment of tasks to stream instances:
 # We assign active tasks to the instances which were having same task as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances which were having same task as standby previously.
 # Active tasks which still couldn't be assigned to instances in round-robin 
starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one unique active 
task so no extra rack aware logic is required in this step.
 # Now we have to assign standy-tasks, and here we assign standby to instances 
in different rack then it's active task or other standy-tasks are running. If 
we run out of racks then we can assign standby-tasks in same rack but different 
instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

> Rack aware replica assignment in kafka streams
> --
>
> Key: KAFKA-6642
> URL: https://issues.apache.org/jira/browse/KAFKA-6642
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware 
> replica 
> assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]).
> This request is to have a similar feature for kafka streams applications. 
> Standby tasks/standby replica assignment in kafka streams is currently not 
> rack aware, and this request is to make it rack aware for better availability.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6642) Rack aware replica assignment in kafka streams

2018-03-27 Thread Ashish Surana (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778
 ] 

Ashish Surana edited comment on KAFKA-6642 at 3/28/18 4:12 AM:
---

Current task assignor is sticky, and it can be made rack-aware. Where we ensure 
that same tasks (active & replicas) are assigned different racks as much as 
possible.

Approach
 # RACK_ID is added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same rack_id are 
considered in the same rack.
 # No changes in input topic partition to task assignment

 

Assignment of tasks to stream instances:
 # We assign active tasks to the instances which were having same task as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances which were having same task as standby previously.
 # Active tasks which still couldn't be assigned to instances in round-robin 
starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one unique active 
task so no extra rack aware logic is required in this step.
 # Now we have to assign standy-tasks, and here we assign standby to instances 
in different rack then it's active task or other standy-tasks are running. If 
we run out of racks then we can assign standby-tasks in same rack but different 
instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

Scenario#1

When no RACK_ID is not passed in any of the stream instances.

In this case, assignment will happen as it's happening currently by 
StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are 
considered to be part of single default-rack.

 

Scenario#2

When RACK_ID is passed in all the stream instances.

In this case, all instances belong to one or the other rack, and assignment is 
rack-aware as per above approach.

 

Scenario#3

When RACK_ID is passed in some stream instances but not in all.

In this case, all the instances with RACK_ID will belong to the provided racks. 
All the instances for whom RACK_ID were not passed, will be considered to be 
part of single default-rack.

 

Please let us know what you guys think about approach.


was (Author: asurana):
Current task assignor is sticky, and it can be made rack-aware. Where we ensure 
that same tasks (active & replicas) are assigned different racks as much as 
possible.

Approach
 # RACK_ID is added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same rack_id are 
considered in the same rack.
 # No changes in input topic partition to task assignment

 

Assignment of tasks to stream instances:
 # We assign active tasks to the instances which were having same task as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances which were having same task as standby previously.
 # Active tasks which still couldn't be assigned to instances in round-robin 
starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one unique active 
task so no extra rack aware logic is required in this step.
 # Now we have to assign standy-tasks, and here we assign standby to instances 
in different rack then it's active task or other standy-tasks are running. If 
we run out of racks then we can assign standby-tasks in same rack but different 
instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

> Rack aware replica assignment in kafka streams
> --
>
> Key: KAFKA-6642
> URL: https://issues.apache.org/jira/browse/KAFKA-6642
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware 
> replic

[jira] [Comment Edited] (KAFKA-6642) Rack aware replica assignment in kafka streams

2018-03-27 Thread Ashish Surana (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778
 ] 

Ashish Surana edited comment on KAFKA-6642 at 3/28/18 4:13 AM:
---

Current task assignor is sticky, and it can be made rack-aware. Where we ensure 
that same tasks (active & replicas) are assigned different racks as much as 
possible.

Approach
 # RACK_ID is added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same rack_id are 
considered in the same rack.
 # No changes in input topic partition to task assignment

 

Assignment of tasks to stream instances:
 # We assign active tasks to the instances which were having same task as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances which were having same task as standby previously.
 # Active tasks which still couldn't be assigned to instances in round-robin 
starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one unique active 
task so no extra rack aware logic is required in this step.
 # Now we have to assign standy-tasks, and here we assign standby to instances 
in different rack then it's active task or other standy-tasks are running. If 
we run out of racks then we can assign standby-tasks in same rack but different 
instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

Scenario#1

When RACK_ID is not passed in any of the stream instances.

In this case, assignment will happen as it's happening currently by 
StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are 
considered to be part of single default-rack.

 

Scenario#2

When RACK_ID is passed in all the stream instances.

In this case, all instances belong to one or the other rack, and assignment is 
rack-aware as per above approach.

 

Scenario#3

When RACK_ID is passed in some stream instances but not in all.

In this case, all the instances with RACK_ID will belong to the provided racks. 
All the instances for whom RACK_ID were not passed, will be considered to be 
part of single default-rack.

 

Please let us know what you guys think about approach.


was (Author: asurana):
Current task assignor is sticky, and it can be made rack-aware. Where we ensure 
that same tasks (active & replicas) are assigned different racks as much as 
possible.

Approach
 # RACK_ID is added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same rack_id are 
considered in the same rack.
 # No changes in input topic partition to task assignment

 

Assignment of tasks to stream instances:
 # We assign active tasks to the instances which were having same task as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances which were having same task as standby previously.
 # Active tasks which still couldn't be assigned to instances in round-robin 
starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one unique active 
task so no extra rack aware logic is required in this step.
 # Now we have to assign standy-tasks, and here we assign standby to instances 
in different rack then it's active task or other standy-tasks are running. If 
we run out of racks then we can assign standby-tasks in same rack but different 
instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

Scenario#1

When no RACK_ID is not passed in any of the stream instances.

In this case, assignment will happen as it's happening currently by 
StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are 
considered to be part of single default-rack.

 

Scenario#2

When RACK_ID is passed in all the stream instances.

In this case, all instances belong to one or the other rack, and assignment is 
rack-aware as per above approach

[jira] [Comment Edited] (KAFKA-6642) Rack aware replica assignment in kafka streams

2018-03-27 Thread Ashish Surana (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416778#comment-16416778
 ] 

Ashish Surana edited comment on KAFKA-6642 at 3/28/18 4:18 AM:
---

Current task assignor is sticky, and it can be made rack-aware with few 
changes. Where we ensure that same tasks (active & replicas) are assigned on 
different racks as much as possible.

Approach
 # RACK_ID can be added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same RACK_ID are 
considered in the same rack.
 # No changes in partition to task assignment

 

Assignment of tasks to instances:
 # We assign active tasks to the instances where same task was running as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances where same task was running as standby previously
 # Active tasks that still couldn't be assigned, are assigned to instances in 
round-robin way starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one active task 
for any task_id so no extra rack aware logic is required in assigning active 
tasks.
 # Now we have to assign standy-task, and here we assign these to instances 
running in racks other than the one with its active task. If we run out of 
racks then we can assign standby-tasks in same rack but on different instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

Scenario#1

When RACK_ID is not passed in any of the stream instances.

In this case, assignment will happen as it's happening currently by 
StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are 
considered to be part of single default-rack.

 

Scenario#2

When RACK_ID is passed in all the stream instances.

In this case, all instances belong to one or the other rack, and assignment is 
rack-aware as per above approach.

 

Scenario#3

When RACK_ID is passed in some stream instances but not in all.

In this case, all the instances with RACK_ID will belong to the provided racks. 
All the instances for whom RACK_ID were not passed, will be considered to be 
part of single default-rack.

 

Please let us know what you guys think about approach.


was (Author: asurana):
Current task assignor is sticky, and it can be made rack-aware. Where we ensure 
that same tasks (active & replicas) are assigned different racks as much as 
possible.

Approach
 # RACK_ID is added in StreamsConfig file, and needs to be passed while 
starting kafka-streams application. All the processes having same rack_id are 
considered in the same rack.
 # No changes in input topic partition to task assignment

 

Assignment of tasks to stream instances:
 # We assign active tasks to the instances which were having same task as 
active previously.
 # Active Tasks which couldn't be assigned in first step are assigned to the 
instances which were having same task as standby previously.
 # Active tasks which still couldn't be assigned to instances in round-robin 
starting from least-loaded instance
 # Above 3 steps are same as StickyAssignor as there is only one unique active 
task so no extra rack aware logic is required in this step.
 # Now we have to assign standy-tasks, and here we assign standby to instances 
in different rack then it's active task or other standy-tasks are running. If 
we run out of racks then we can assign standby-tasks in same rack but different 
instances.
 # This makes the assignment rack-aware but more of a best effort and doesn't 
guarantee anything. This is because we might not have capacity left in some 
racks or we might have more number of replicas than number of racks etc

Note: Here we are making current StickyTaskAssignor rack-aware, but doesn't 
change the logic drastically. For example, current assignor is only sticky for 
active tasks, and standby task assignment logic is not sticky as it doesn't 
look for where the task was assigned previously.

Scenario#1

When RACK_ID is not passed in any of the stream instances.

In this case, assignment will happen as it's happening currently by 
StickyTaskAssignor. For all the instances for whom RACK_ID is not passed are 
considered to be part of single default-rack.

 

Scenario#2

When RACK_ID is passed in all the stream instances.

In this case, all instances belong to one or the other rack, and assignment is 
rack-awar

[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-27 Thread Matthias J. Sax (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416806#comment-16416806
 ] 

Matthias J. Sax commented on KAFKA-6723:


Make sense to me. Seems to be related to KAFKA-6657 -- do you think KAFKA-6657 
subsumes this ticket?

One open question, regardless of KAFKA-6657 is, if the default values for 
`max.poll.records` should be different for both consumers.

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6642) Rack aware task assignment in kafka streams

2018-03-27 Thread Ashish Surana (JIRA)
 [ 
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6642:
-
Summary: Rack aware task assignment in kafka streams  (was: Rack aware 
replica assignment in kafka streams)

> Rack aware task assignment in kafka streams
> ---
>
> Key: KAFKA-6642
> URL: https://issues.apache.org/jira/browse/KAFKA-6642
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware 
> replica 
> assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]).
> This request is to have a similar feature for kafka streams applications. 
> Standby tasks/standby replica assignment in kafka streams is currently not 
> rack aware, and this request is to make it rack aware for better availability.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-27 Thread Guozhang Wang (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416816#comment-16416816
 ] 

Guozhang Wang commented on KAFKA-6723:
--

What are the driving rationales for different default values for different 
consumer types?

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416847#comment-16416847
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

cemo closed pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not 
write offsets of in-mem…
URL: https://github.com/apache/kafka/pull/4782
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 56e6bed0850..bd4f67e9de1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -41,6 +41,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -334,10 +335,33 @@ public void close(final Map 
offsets) throws IOException {
 
 @Override
 public void checkpoint(final Map offsets) {
+
+// Find non persistent store's topics
+final Map storeToChangelogTopic = 
topology.storeToChangelogTopic();
+final Set globalNonPersistentStoresTopics = new HashSet<>();
+for (final StateStore store : topology.globalStateStores()) {
+if (!store.persistent() && 
storeToChangelogTopic.containsKey(store.name())) {
+
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+}
+}
+
 checkpointableOffsets.putAll(offsets);
-if (!checkpointableOffsets.isEmpty()) {
+
+final Map filteredOffsets = new HashMap<>();
+
+// Skip non persistent store
+for (final Map.Entry topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+final String topic = topicPartitionOffset.getKey().topic();
+if (globalNonPersistentStoresTopics.contains(topic)) {
+log.debug("Skipping global store' topic {}", topic);
+} else {
+filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
+}
+}
+
+if (!filteredOffsets.isEmpty()) {
 try {
-checkpoint.write(checkpointableOffsets);
+checkpoint.write(filteredOffsets);
 } catch (IOException e) {
 log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index df8d2010d24..c449ec5f527 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -488,6 +488,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws 
IOException {
 assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
 }
 
+@Test
+public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws 
IOException {
+stateManager.initialize();
+initializeConsumer(10, 1, t3);
+stateManager.register(store3, stateRestoreCallback);
+stateManager.close(Collections.emptyMap());
+
+assertThat(readOffsetsCheckpoint(), 
equalTo(Collections.emptyMap()));
+}
+
 private Map readOffsetsCheckpoint() throws 
IOException {
 final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),

 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java 
b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index ae46b8dadaa..08945d5047a 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -95,7 +95,7 @@ public void close() {
 
 @Override
 public boolean persistent() {
-return false;
+return rocksdbStore;
 }
 
 @Override


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific co

[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-03-27 Thread ASF GitHub Bot (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416859#comment-16416859
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

cemo opened a new pull request #4782: KAFKA-6711: GlobalStateManagerImpl should 
not write offsets of in-mem…
URL: https://github.com/apache/kafka/pull/4782
 
 
   This PR is addressing issues when persisting non persistent stores into 
checkpoint file. 
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-27 Thread Srinivas Dhruvakumar (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416862#comment-16416862
 ] 

Srinivas Dhruvakumar commented on KAFKA-:
-

[~huxi_2b] -tried the latest patch KAFKA-3978 still hitting this bug 

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-27 Thread Srinivas Dhruvakumar (JIRA)
[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416912#comment-16416912
 ] 

Srinivas Dhruvakumar commented on KAFKA-6649:
-

[~hachikuji] Still hitting the bug after testing with the patch. 

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)