[jira] [Commented] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Umesh Chaudhary (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074389#comment-16074389
 ] 

Umesh Chaudhary commented on KAFKA-5556:


[~damianguy], I can work on this. 

> KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve 
> exception from future which hasn't failed
> --
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor

2017-07-05 Thread Paolo Patierno (JIRA)
Paolo Patierno created KAFKA-5557:
-

 Summary: Using a logPrefix inside the StreamPartitionAssignor
 Key: KAFKA-5557
 URL: https://issues.apache.org/jira/browse/KAFKA-5557
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Paolo Patierno
Assignee: Paolo Patierno
Priority: Trivial


Hi,
the "stream-thread [%s]" is replicated more times in all the logging messages 
inside the StreamPartitionAssignor. Using a logPrefix like for the StreamThread 
class could be better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5537) Subscribe Earliest is not working as in 0.10.2.1

2017-07-05 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074428#comment-16074428
 ] 

Rajini Sivaram commented on KAFKA-5537:
---

[~michael.andre.pearce] I have run your test and it passes consistently for me 
if I increase the first sleep (before produce) from 4000 to 8000. Your logs for 
0.11.0 show that you are producing before partitions are assigned. With offsets 
reset to "latest", this will mean that the messages produced before assignment 
are not consumed.

>From your logs for 0.11.0:
{quote}
2017-06-30 23:02:15 INFO  AbstractCoordinator:597 - Discovered coordinator 
localhost:52196 (id: 2147483646 rack: null) for group test-group.
017-06-30 23:02:15 INFO  AbstractCoordinator:597 - Discovered coordinator 
localhost:52196 (id: 2147483646 rack: null) for group test-group.
...
017-06-30 23:02:18 TRACE KafkaProducer:740 - Sending record 
ProducerRecord(topic=topic, partition=null, headers=RecordHeaders(headers = [], 
isReadOnly = true), key=null, value=[B@62379589, timestamp=null) with callback 
null to topic topic partition 0
...
017-06-30 23:02:21 INFO  ConsumerCoordinator:262 - Setting newly assigned 
partitions [] for group test-group
2017-06-30 23:02:21 INFO  ConsumerCoordinator:262 - Setting newly assigned 
partitions (topic-0) for group test-group
{quote}

Messages are produced 4 seconds after the consumer was created. But at this 
point, rebalancing hasn't completed and no partitions have been assigned to the 
consumers. When the partitions are assigned 3 seconds later, consumers start 
consuming from the "latest" offset at point, ignoring already produced 
messages. With {{AUTO_OFFSET_RESET_CONFIG=earliest}}, this would have worked.

With lower {{group.initial.rebalance.delay.ms}}, the test works since rebalance 
completes sooner. When your test was waiting for 60 seconds, can you check if 
the wait was before producing messages when offset reset strategy is {{latest}}?

> Subscribe Earliest is not working as in 0.10.2.1
> 
>
> Key: KAFKA-5537
> URL: https://issues.apache.org/jira/browse/KAFKA-5537
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Michael Andre Pearce
>Priority: Critical
> Attachments: kafka_0.10.2.1.log, kafka_0.11.0.0.log, KafkaSub.java, 
> KafkaSubLatest.java
>
>
> We have seen issue with subscription where auto offset when set to earliest 
> (and also latest) does not behave the same as in 0.10.2.1 release.
> We have managed to create a repeatable test for this, which passes when 
> pointing to 0.10.2.1 broker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5519) Support for multiple certificates in a single keystore

2017-07-05 Thread Tom Bentley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074427#comment-16074427
 ] 

Tom Bentley commented on KAFKA-5519:


Can you explain why having a keystore per component is particularly 
problematic, but using multiple certificates in a keystore and using aliases 
would make things significantly better?

> Support for multiple certificates in a single keystore
> --
>
> Key: KAFKA-5519
> URL: https://issues.apache.org/jira/browse/KAFKA-5519
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Alla Tumarkin
>  Labels: upstream-issue
>
> Background
> Currently, we need to have a keystore exclusive to the component with exactly 
> one key in it. Looking at the JSSE Reference guide, it seems like we would 
> need to introduce our own KeyManager into the SSLContext which selects a 
> configurable key alias name.
> https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509KeyManager.html 
> has methods for dealing with aliases.
> The goal here to use a specific certificate (with proper ACLs set for this 
> client), and not just the first one that matches.
> Looks like it requires a code change to the SSLChannelBuilder



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074480#comment-16074480
 ] 

ASF GitHub Bot commented on KAFKA-5557:
---

GitHub user ppatierno opened a pull request:

https://github.com/apache/kafka/pull/3488

KAFKA-5557: Using a logPrefix inside the StreamPartitionAssignor

Added logPrefix for avoiding stream thread name formatting replicated more 
times

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppatierno/kafka kafka-5557

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3488


commit 87a7400acc5feda5828a7fa1604df28a4690c7e2
Author: ppatierno 
Date:   2017-07-05T09:18:02Z

Added logPrefix for avoiding stream thread name formatting replicated more 
times




> Using a logPrefix inside the StreamPartitionAssignor
> 
>
> Key: KAFKA-5557
> URL: https://issues.apache.org/jira/browse/KAFKA-5557
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Trivial
>
> Hi,
> the "stream-thread [%s]" is replicated more times in all the logging messages 
> inside the StreamPartitionAssignor. Using a logPrefix like for the 
> StreamThread class could be better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Fix Version/s: 0.10.2.1
   0.11.0.1

> KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve 
> exception from future which hasn't failed
> --
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
> Fix For: 0.10.2.1, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Priority: Critical  (was: Major)

> KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve 
> exception from future which hasn't failed
> --
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Priority: Critical
> Fix For: 0.10.2.1, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-5556:
--

Assignee: Umesh Chaudhary

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Summary: KafkaConsumer.commitSync throws IllegalStateException: Attempt to 
retrieve exception from future which hasn't failed  (was: KafkaConsumer throws: 
java.lang.IllegalStateException: > Attempt to retrieve exception from future 
which hasn't failed)

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5556:
---
Fix Version/s: (was: 0.10.2.1)
   0.10.2.2

> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)
zhu fangbo created KAFKA-5558:
-

 Summary: can not connect to the unsecure port after config 
SASL/PLAIN
 Key: KAFKA-5558
 URL: https://issues.apache.org/jira/browse/KAFKA-5558
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Affects Versions: 0.10.1.1
Reporter: zhu fangbo


Dear All, 
I followed modifying sasl mechanism in a running cluster to set a cluster with 
one broker using SASL/PLAIN to authorize client. here are configurations:
server config
server.properties:
listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
My producer configured with security.protocol=SASL_PLAINTEXT and correct 
jass.conf can work well when I connect to the secure port(9094),but when I use 
the default security.protocol and connect to unsecure port(9093), the producer 
can not fetch metadata:
DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
Sending metadata request {topics=[test]} to node -1
WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Error 
while fetching metadata with correlation id 0 : 
{test=UNKNOWN_TOPIC_OR_PARTITION}
DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
Sending metadata request {topics=[test]} to node 1
WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Error 
while fetching metadata with correlation id 2 : 
{test=UNKNOWN_TOPIC_OR_PARTITION}
Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu fangbo updated KAFKA-5558:
--
Issue Type: Bug  (was: New Feature)

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074537#comment-16074537
 ] 

Rajini Sivaram commented on KAFKA-5558:
---

The error indicates PLAINTEXT producer has connected successfully to a broker. 
So this is likely to be an authorization issue. Have you granted access to the 
topic for the PLAINTEXT user? The user name is ANONYMOUS.

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074567#comment-16074567
 ] 

zhu fangbo commented on KAFKA-5558:
---

Rajini Sivaram ,
 Thanks for your advise, it really helps.I think I shoud read the code of 
kafkaServer carefully .

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074573#comment-16074573
 ] 

ASF GitHub Bot commented on KAFKA-5556:
---

GitHub user umesh9794 opened a pull request:

https://github.com/apache/kafka/pull/3489

KAFKA-5556 : KafkaConsumer.commitSync throws IllegalStateException: A…

This PR makes `commitOffsetsSync` method check whether future is completed 
after client's poll or not. 

Tests: All existing tests especially 
"`testCommitOffsetSyncCallbackWithNonRetriableException`" is passed. Not sure 
if we need to add any dedicated tests for this minor change. 

Awaiting your review comments. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/umesh9794/kafka KAFKA-5556

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3489


commit 1d44ca726026aea7bb030d5eecb5d4a197b5b0b9
Author: umesh chaudhary 
Date:   2017-07-05T10:45:59Z

KAFKA-5556 : KafkaConsumer.commitSync throws IllegalStateException: Attempt 
to retrieve exception from future which hasn't failed




> KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve 
> exception from future which hasn't failed
> 
>
> Key: KAFKA-5556
> URL: https://issues.apache.org/jira/browse/KAFKA-5556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Umesh Chaudhary
>Priority: Critical
> Fix For: 0.10.2.2, 0.11.0.1
>
>
> From the user list:
> I have been running a streaming application on some data set. Things
> usually run ok. Today I was trying to run the same application on Kafka
> (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After
> running for quite some time, I got the following exception ..
> {code}
> Exception in thread "StreamThread-1" java.lang.IllegalStateException:
> > Attempt to retrieve exception from future which hasn't failed
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> > at
> > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> {code}
> Looks like we should check if the future is done, i.e., check the return 
> value from poll and retry if time is remaining and {{!future.isDone()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN

2017-07-05 Thread zhu fangbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu fangbo resolved KAFKA-5558.
---
Resolution: Fixed

this is not a bug

> can not connect to the unsecure port after config SASL/PLAIN
> 
>
> Key: KAFKA-5558
> URL: https://issues.apache.org/jira/browse/KAFKA-5558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> Dear All, 
> I followed modifying sasl mechanism in a running cluster to set a cluster 
> with one broker using SASL/PLAIN to authorize client. here are configurations:
> server config
> server.properties:
> listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin
> kafka_server_jaas.conf:
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> };
> My producer configured with security.protocol=SASL_PLAINTEXT and correct 
> jass.conf can work well when I connect to the secure port(9094),but when I 
> use the default security.protocol and connect to unsecure port(9093), the 
> producer can not fetch metadata:
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node -1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 0 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Sending metadata request {topics=[test]} to node 1
> WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] 
> Error while fetching metadata with correlation id 2 : 
> {test=UNKNOWN_TOPIC_OR_PARTITION}
> Why the unsecure port can not be connected after config sasl authorization?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:03 AM:


*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 


was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format*ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating the segment id 
> and uses {{SimpleDateFormat}} for formatting the segment id.  However this is 
> a high volume code path and creating a new {{SimpleDateFormat}} and 
> formatting each segment id is expensive.  We should look into removing the 
> date from the segment id or at a minimum use a faster alternative to 
> {{SimpleDateFormat}}.  We should also consider keeping a lookup of existing 
> segments to avoid as many string operations as possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:03 AM:


*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format*ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 


was (Author: neil.avery):
I've taken a look at dropping SimpleDateFormat and replacing it with 
commons-lang3-FastDateFormat (available in project but not a dependency on this 
module). 

Microbenchmarking diffs show SDF starts at 800ms/million then hotspots down to 
250ms. Interestingly FDF starts at 400ms/million then gets down to 350ms (not 
very convincing). Calendar usage sucks performance and there is a degree of 
caching inside both of the impls. Looking at this in a different way "Segments" 
is a time-series slice/bucketing function to group/allocate/lookup segments 
etc. 

Does a real world calendar matter? - I've knocked together a simple math 
alternative that break into time-slice where all months/years are equals size. 
The time formatting is identical but day/month will be incorrect as a result of 
no calendar. This gets down to 150ms pretty much straight away. (still using 
SDF is still used for parsing).

All tests pass, system runs fine etc - but I'm not sure of the gravity of this 
as a possible change - will it break things - any advice or feedback?

> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating the segment id 
> and uses {{SimpleDateFormat}} for formatting the segment id.  However this is 
> a high volume code path and creating a new {{SimpleDateFormat}} and 
> formatting each segment id is expensive.  We should look into removing the 
> date from the segment id or at a minimum use a faster alternative to 
> {{SimpleDateFormat}}.  We should also consider keeping a lookup of existing 
> segments to avoid as many string operations as possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:06 AM:


*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify the problem? In which case switching to 
unix-time/flooring-to-minute would be preferred.


was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating the segment id 
> and uses {{SimpleDateFormat}} for formatting the segment id.  However this is 
> a high volume code path and creating a new {{SimpleDateFormat}} and 
> formatting each segment id is expensive.  We should look into removing the 
> date from the segment id or at a minimum use a faster alternative to 
> {{SimpleDateFormat}}.  We should also consider keeping a lookup of existing 
> segments to avoid as many string operations as possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:18 AM:


*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? In which case 
switching to unix-time/flooring-to-minute would be preferred provided the 
upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637





was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify the problem? In which case switching to 
unix-time/flooring-to-minute would be preferred.

> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating the segment id 
> and uses {{SimpleDateFormat}} for formatting the segment id.  However this is 
> a high volume code path and creating a new {{SimpleDateFormat}} and 
> formatting each segment id is expensive.  We should look into removing the 
> date from the segment id or at a minimum use a faster alternative to 
> {{SimpleDateFormat}}.  We should also consider keeping a lookup of existing 
> segments to avoid as many string operations as possible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:26 AM:


*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637





was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage.

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? In which case 
switching to unix-time/flooring-to-minute would be preferred provided the 
upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637




> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating the segment id 
> and uses {{SimpleDateFormat}} for formatting the segment id.  However this is 
> a high volume code path and creating a new {{SimpleDateFormat}} and 
> formatting each segment id is expensive.  We should look into removing the 
> date from the segment id or at a minimum use a faster alternative to 
> {{Simp

[jira] [Assigned] (KAFKA-5255) Auto generate request/response classes

2017-07-05 Thread Tom Bentley (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-5255:
--

Assignee: Tom Bentley

> Auto generate request/response classes
> --
>
> Key: KAFKA-5255
> URL: https://issues.apache.org/jira/browse/KAFKA-5255
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ismael Juma
>Assignee: Tom Bentley
> Fix For: 0.11.1.0
>
>
> We should automatically generate the request/response classes from the 
> protocol definition. This is a major source of boilerplate, development 
> effort and inconsistency at the moment. If we auto-generate the classes, we 
> may also be able to avoid the intermediate `Struct` representation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId

2017-07-05 Thread Evgeny Veretennikov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074714#comment-16074714
 ] 

Evgeny Veretennikov commented on KAFKA-5503:


[~hachikuji], part of call stack for blocking idempotent producer is:

{noformat}
Selector.poll(long): L321
NetworkClient.poll(long, long): L433
NetworkClientUtils.sendAndReceive(KafkaClient, ClientRequest, Time): L89
Sender.sendAndAwaitInitProducerIdRequest(): L405
Sender.maybeWaitForProducerId(): L419
Sender.run(long): L204
{noformat}

So, selector blocks thread. While we invoke {{initiateClose()}} method, part of 
call stack is:

{noformat}
Selector.wakeUp(): 240
NetworkClient.wakeUp(): L498
Sender.wakeup(): L675
Sender.initiateClose(): L390
{noformat}

So, that seems, that blocking selector will actually be waken up and thus 
sender thread won't be blocked right after concurrent {{initiateClose()}} call.

So, is this issue still relevant? Do I miss something?

> Idempotent producer ignores shutdown while fetching ProducerId
> --
>
> Key: KAFKA-5503
> URL: https://issues.apache.org/jira/browse/KAFKA-5503
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Evgeny Veretennikov
> Fix For: 0.11.0.1
>
>
> When using the idempotent producer, we initially block the sender thread 
> while we attempt to get the ProducerId. During this time, a concurrent call 
> to close() will be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 1:04 PM:
---

*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637

Note: Segment granularity currently defaults to 1Minute and as a result works 
with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - 
then using a unix-long (numeric) derivative it would be possible where you roll 
the appropriate boundary through a floor configured function.



was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637




> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://issues.apache.org/jira/browse/KAFKA-5515
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Neil Avery
>  Labels: performance
>
> Currently the {{Segments}} class uses a date when calculating 

[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class

2017-07-05 Thread Neil Avery (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546
 ] 

Neil Avery edited comment on KAFKA-5515 at 7/5/17 1:04 PM:
---

*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

*Current WIP commits can be seen here:*
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637

*Note:* Segment granularity currently defaults to 1Minute and as a result works 
with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - 
then using a unix-long (numeric) derivative it would be possible where you roll 
the appropriate boundary through a floor configured function.



was (Author: neil.avery):
*Investigation:*
Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* 
Segment file names during initialisation, and *format* ting during runtime. I 
presume the suggested problem lies in the formatting.

*Micro benchmark SDF*
Formatting 1,000,000 items takes 250ms once hotspotting has kicked in.  Per/M 
items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc]
Parsing is slow - 2500ms per 1,000,000 items

Commons-lang3-FastDateFormat is available in the project but not as a 
dependency on this particular module. FDF micro-bench starts at 400ms/million 
then gets down to 350ms (not very convincing). 

Calendar usage sucks performance and there is a degree of caching inside both 
of the impls. 

Looking at this in a different way "Segments" is a time-series slice/bucketing 
function to group/allocate/lookup segments etc. 

I've knocked together a simple math alternative that breaks into time-slice 
where all months/years are equals size i.e. not using a calendar - you get an 
approximate idea of performance: 150-200ms without hotspotting. The problem is 
that a real-calendar is still used upon initialisation extract segment-ids - 
there will be inconsistencies and likely breakage. _Note: Code can be viewed in 
the commit log at the bottom_

*Best performance*
The best alternative would be to ditch calendars for parsing and formatting and 
to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename 
readability). I'm not sure if there will be operational upgrade paths etc in 
order to make the change seamless. 

I think we need more specifics around the problem itself - are there real-work 
stats that can identify that performance is really a problem? If so switch to 
unix-time/flooring-to-minute where the upgrade-path/operational costs work. 

Current WIP commits can be seen here:
https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637

Note: Segment granularity currently defaults to 1Minute and as a result works 
with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - 
then using a unix-long (numeric) derivative it would be possible where you roll 
the appropriate boundary through a floor configured function.


> Consider removing date formatting from Segments class
> -
>
> Key: KAFKA-5515
> URL: https://

[jira] [Commented] (KAFKA-5500) it is impossible to have custom Login Modules for PLAIN SASL mechanism

2017-07-05 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074720#comment-16074720
 ] 

Rajini Sivaram commented on KAFKA-5500:
---

Kafka's implementation of {{PlainLoginModule}} is tightly integrated with its 
implementation of {{PlainSaslServerProvider}} and {{PlainSaslServer}} (the 
server provider is loaded by the login module). At the moment, you can replace 
the whole server-side SASL/PLAIN implementation with your own implementation by 
replacing the three classes in {{org.apache.kafka.common.security.plain}} with 
your own implementation. As described in the docs 
(https://kafka.apache.org/documentation/#security_sasl_plain_production), the 
implementation in Kafka is provided as a sample and not suitable for use in 
production,

[KIP-86|https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers]
 should improve customization.

> it is impossible to have custom Login Modules for PLAIN SASL mechanism
> --
>
> Key: KAFKA-5500
> URL: https://issues.apache.org/jira/browse/KAFKA-5500
> Project: Kafka
>  Issue Type: Wish
>Reporter: Anton Patrushev
>Priority: Minor
>
> This change -
>  
> https://github.com/apache/kafka/commit/275c5e1df237808fe72b8d9933f826949d4b5781#diff-3e86ea3ab586f9b6f920c00508a0d5bcR95
>  - makes it impossible have login modules other than PlainLoginModule used 
> for PLAIN SASL mechanism. Could it be changed the way that doesn't use 
> particular login module class name?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4249) Document how to customize GC logging options for broker

2017-07-05 Thread Evgeny Veretennikov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgeny Veretennikov reassigned KAFKA-4249:
--

Assignee: Tom Bentley

> Document how to customize GC logging options for broker
> ---
>
> Key: KAFKA-4249
> URL: https://issues.apache.org/jira/browse/KAFKA-4249
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Jim Hoagland
>Assignee: Tom Bentley
>
> We wanted to enable GC logging for Kafka broker and saw that you can set 
> GC_LOG_ENABLED=true.  However, this didn't do what we wanted.  For example, 
> the GC log will be overwritten every time the broker gets restarted.  It 
> wasn't clear how we could do that (no documentation of it that I can find), 
> so I did some research by looking at the source code and did some testing and 
> found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to 
> starting broker.  I posted my solution to StackOverflow:
>   
> http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove
> (feel free to critique)
> That solution is now public, but it seems like the Kafka documentation should 
> say how to do this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-3823) QuickStart documentation is still showing MirrorMakers supports more than one consumer.config

2017-07-05 Thread Evgeny Veretennikov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgeny Veretennikov resolved KAFKA-3823.

Resolution: Fixed

Closed issue, as [~tombentley] suggested.

> QuickStart documentation is still showing MirrorMakers supports more than one 
> consumer.config
> -
>
> Key: KAFKA-3823
> URL: https://issues.apache.org/jira/browse/KAFKA-3823
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.9.0.0
>Reporter: Chak Lee
>Priority: Minor
>
> On the official QuickStart documentation, the MirrorMaker section is still 
> showing the following example:
> {code}
> bin/kafka-mirror-maker.sh
>--consumer.config consumer-1.properties --consumer.config 
> consumer-2.properties
>--producer.config producer.properties --whitelist my-topic
> {code}
> However, the support for this is already dropped in KAFKA-1650.  If you  
> tried to run the above script, you will get the following error:
> {code}
> [2016-06-10 18:35:11,201] ERROR Exception when starting mirror maker. 
> (kafka.tools.MirrorMaker$)
> joptsimple.MultipleArgumentsForOptionException: Found multiple arguments for 
> option consumer.config, but you asked for only one
> {code}
> Please update the website's QuickStart section for MirrorMakers.  Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4609) KTable/KTable join followed by groupBy and aggregate/count can result in incorrect results

2017-07-05 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075005#comment-16075005
 ] 

Damian Guy commented on KAFKA-4609:
---

This was partially fixed by 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+state+stores+and+improved+semantics
If you use one of the join/leftJoin/outerJoin methods that take either a 
{{StateStoreSupplier}} or {{queryableName}} as a param then it works. However, 
for the basic join/leftJoin/outerJoin method it doesn't work. In order to make 
it work properly we need to add another param to these join methods, 
{{joinSerde}}, so that we can construct the state store etc.

This would require a KIP. However as we are currently discussing DSL changes to 
remove overloads I'd recommend we hold until we know which direction we are 
going. 

> KTable/KTable join followed by groupBy and aggregate/count can result in 
> incorrect results
> --
>
> Key: KAFKA-4609
> URL: https://issues.apache.org/jira/browse/KAFKA-4609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>  Labels: architecture
>
> When caching is enabled, KTable/KTable joins can result in duplicate values 
> being emitted. This will occur if there were updates to the same key in both 
> tables. Each table is flushed independently, and each table will trigger the 
> join, so you get two results for the same key. 
> If we subsequently perform a groupBy and then aggregate operation we will now 
> process these duplicates resulting in incorrect aggregated values. For 
> example count will be double the value it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075053#comment-16075053
 ] 

Damian Guy commented on KAFKA-5545:
---

As you suggested, it would see that the first KafkaStreams instance hasn't 
closed successfully as some threads are stuck. You should probably try calling 
{{boolean KafkaStreams.close(timeout, timeunit)}} and check the return value. 
If the result is {{false}} then you should probably terminate the application 
and restart.

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-de

[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075056#comment-16075056
 ] 

Matthias J. Sax commented on KAFKA-5070:


I was working on https://issues.apache.org/jira/browse/KAFKA-5167 and hoping 
that it will cover this JIRA as well. Thoughts?

> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> 
>
> Key: KAFKA-5070
> URL: https://issues.apache.org/jira/browse/KAFKA-5070
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux Version
>Reporter: Dhana
>Assignee: Matthias J. Sax
> Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075061#comment-16075061
 ] 

Yogesh BG commented on KAFKA-5545:
--

I see. But what could be the problems in closing the strems.  I don't see
restarting application a good idea. From log we can see some threads still
polling to connect to old ip. We should try closing those threads right.

One more thing is if I do close with in connction timeout all goes well.
But if I issue close after connection timeout the threads are stuck




> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp

[jira] [Commented] (KAFKA-5519) Support for multiple certificates in a single keystore

2017-07-05 Thread Alla Tumarkin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075164#comment-16075164
 ] 

Alla Tumarkin commented on KAFKA-5519:
--

I wouldn't call it problematic: I just imagine there are situations where 
multiple J2EE applications may want to use a single keystore and import their 
client certificates into a single keystore - in order to decrease management 
overhead by not having to maintain multiple keystores (like managing keystore 
passwords, for example).

> Support for multiple certificates in a single keystore
> --
>
> Key: KAFKA-5519
> URL: https://issues.apache.org/jira/browse/KAFKA-5519
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Alla Tumarkin
>  Labels: upstream-issue
>
> Background
> Currently, we need to have a keystore exclusive to the component with exactly 
> one key in it. Looking at the JSSE Reference guide, it seems like we would 
> need to introduce our own KeyManager into the SSLContext which selects a 
> configurable key alias name.
> https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509KeyManager.html 
> has methods for dealing with aliases.
> The goal here to use a specific certificate (with proper ACLs set for this 
> client), and not just the first one that matches.
> Looks like it requires a code change to the SSLChannelBuilder



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-5528:


> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5528.

Resolution: Not A Bug

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075191#comment-16075191
 ] 

Matthias J. Sax commented on KAFKA-5528:


Added FAQ: 
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata?

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5525) Streams reset tool should have same console output with or without dry-run

2017-07-05 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5525:
-
Component/s: streams

> Streams reset tool should have same console output with or without dry-run
> --
>
> Key: KAFKA-5525
> URL: https://issues.apache.org/jira/browse/KAFKA-5525
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> Hi,
> I see that the Streams reset tool provides a console output a little bit 
> different when you execute it using "dry-run" (so without executing any real 
> action) or without it.
> With dry-run :
> {code}
> Dry run displays the actions which will be performed when running Streams 
> Reset Tool
> Following input topics offsets will be reset to beginning (for consumer group 
> streams-wordcount)
> Topic: streams-file-input
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic: streams-wordcount-Counts-repartition
> Topic: streams-wordcount-Counts-changelog
> Done.
> {code}
> without dry-run :
> {code}
> Seek-to-beginning for input topics [streams-file-input]
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic streams-wordcount-Counts-repartition is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Topic streams-wordcount-Counts-changelog is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Done.
> {code}
> I think that the version with dry-run related to show "Seek-to-beginning for 
> input topics [streams-file-input]" could be used even for version without 
> dry-run.
> The output should be consistent and the only difference should be on 
> executing real actions or not.
> I'm working on a trivial PR for a proposal.
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5525) Streams reset tool should have same console output with or without dry-run

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075510#comment-16075510
 ] 

ASF GitHub Bot commented on KAFKA-5525:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3443


> Streams reset tool should have same console output with or without dry-run
> --
>
> Key: KAFKA-5525
> URL: https://issues.apache.org/jira/browse/KAFKA-5525
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> Hi,
> I see that the Streams reset tool provides a console output a little bit 
> different when you execute it using "dry-run" (so without executing any real 
> action) or without it.
> With dry-run :
> {code}
> Dry run displays the actions which will be performed when running Streams 
> Reset Tool
> Following input topics offsets will be reset to beginning (for consumer group 
> streams-wordcount)
> Topic: streams-file-input
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic: streams-wordcount-Counts-repartition
> Topic: streams-wordcount-Counts-changelog
> Done.
> {code}
> without dry-run :
> {code}
> Seek-to-beginning for input topics [streams-file-input]
> Done.
> Deleting all internal/auto-created topics for application streams-wordcount
> Topic streams-wordcount-Counts-repartition is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Topic streams-wordcount-Counts-changelog is marked for deletion.
> Note: This will have no impact if delete.topic.enable is not set to true.
> Done.
> {code}
> I think that the version with dry-run related to show "Seek-to-beginning for 
> input topics [streams-file-input]" could be used even for version without 
> dry-run.
> The output should be consistent and the only difference should be on 
> executing real actions or not.
> I'm working on a trivial PR for a proposal.
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075511#comment-16075511
 ] 

Guozhang Wang commented on KAFKA-5528:
--

Thanks [~mjsax] for adding the FAQ, I think this is indeed a common situation 
users may mistakenly configure.

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3331) Refactor TopicCommand to make it testable and add unit tests

2017-07-05 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075525#comment-16075525
 ] 

Gwen Shapira commented on KAFKA-3331:
-

+1, that would be awesome
Since it is via a normal API, we can also add stuff like auth, use of ACL, etc

> Refactor TopicCommand to make it testable and add unit tests
> 
>
> Key: KAFKA-3331
> URL: https://issues.apache.org/jira/browse/KAFKA-3331
> Project: Kafka
>  Issue Type: Wish
>Affects Versions: 0.9.0.1
>Reporter: Ashish Singh
>Assignee: Ashish Singh
> Fix For: 0.11.1.0
>
>
> TopicCommand has become a functionality packed, hard to read, class. Adding 
> or changing it with confidence requires some unit tests around it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075541#comment-16075541
 ] 

Guozhang Wang commented on KAFKA-5545:
--

[~yogeshbelur] In your code snippet it seems you did not ever close the 
instance before creating the new instance and then call {{cleanUp}}, or are the 
{{close()}} and {{start()}} calls for the previous instance (it is hard to tell 
how {{setupDiscovery}} is triggered)? 

{code}
close();
streams = new KafkaStreams(buildTopology(config), config);
logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString 
+ "].");
streams.cleanUp();
start();
{code}

Anyways, if {{Streams.close()}} is indeed called, then the producer will be 
closed in that function and the inner {{Sender}} thread will be terminated and 
not try to connect to the broker anymore.

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>   

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075541#comment-16075541
 ] 

Guozhang Wang edited comment on KAFKA-5545 at 7/5/17 10:34 PM:
---

[~yogeshbelur] In your code snippet it seems you did not ever close the 
instance before creating the new instance and then call {{cleanUp}}, or are the 
{{close()}} and {{start()}} calls for the previous instance (it is hard to tell 
how {{setupDiscovery}} is triggered)? 

{code}
close();
streams = new KafkaStreams(buildTopology(config), config);
logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString 
+ "].");
streams.cleanUp();
start();
{code}

Anyways, if {{Streams.close()}} is indeed called, then the producer will be 
closed in that function and the inner {{Sender}} thread will be terminated and 
not try to connect to the broker anymore. But from your attached logs it does 
seems the thread was notified to shutdown but never existed the main loop:

{code}
10:02:33.981 [pool-1-thread-1] INFO  o.apache.kafka.streams.KafkaStreams - 
stream-client [ks_0_inst] State transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed 
thread to shut down
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] Informed 
thread to shut down
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] Informed 
thread to shut down
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] State 
transition from RUNNING to PENDING_SHUTDOWN

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075558#comment-16075558
 ] 

Guozhang Wang commented on KAFKA-5545:
--

Which config of the {{connection timeout}} that you are referring to?

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients

[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records

2017-07-05 Thread Ankush Puri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075624#comment-16075624
 ] 

Ankush Puri commented on KAFKA-4936:


[~damianguy] [~ijuma] Any plans of this feature being added to future release 
of kafka ?

> Allow dynamic routing of output records
> ---
>
> Key: KAFKA-4936
> URL: https://issues.apache.org/jira/browse/KAFKA-4936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>  Labels: needs-kip
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-07-05 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4601:
-
Description: 
Consider the following DSL:

{code}
Stream source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1");
Stream mapped = source.map(..);

KTable counts = mapped
.groupByKey()
.count("Counts");

KStream sink = mapped.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [topic1]
children:   [KSTREAM-MAP-01]
KSTREAM-MAP-01:
children:   
[KSTREAM-FILTER-04, KSTREAM-FILTER-07]
KSTREAM-FILTER-04:
children:   
[KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  X-Counts-repartition
KSTREAM-FILTER-07:
children:   
[KSTREAM-SINK-06]
KSTREAM-SINK-06:
topic:  
X-KSTREAM-MAP-01-repartition

ProcessorTopology:
KSTREAM-SOURCE-08:
topics: 
[X-KSTREAM-MAP-01-repartition]
children:   
[KSTREAM-LEFTJOIN-09]
KSTREAM-LEFTJOIN-09:
states: [Counts]
KSTREAM-SOURCE-05:
topics: [X-Counts-repartition]
children:   
[KSTREAM-AGGREGATE-02]
KSTREAM-AGGREGATE-02:
states: [Counts]
{code}

I.e. there are two repartition topics, one for the aggregate and one for the 
join, which not only introduce unnecessary overheads but also mess up the 
processing ordering (users are expecting each record to go through aggregation 
first then the join operator). And in order to get the following simpler 
topology users today need to add a {{through}} operator after {{map}} manually 
to enforce repartitioning.

{code}
Stream source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1");
Stream repartitioned = source.map(..).through("topic2");

KTable counts = repartitioned
.groupByKey()
.count("Counts");

KStream sink = repartitioned.leftJoin(counts, ..);
{code}

The resulted topology then will look like this:

{code}
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [topic1]
children:   [KSTREAM-MAP-01]
KSTREAM-MAP-01:
children:   
[KSTREAM-SINK-02]
KSTREAM-SINK-02:
topic:  topic 2

ProcessorTopology:
KSTREAM-SOURCE-03:
topics: [topic 2]
children:   
[KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
KSTREAM-AGGREGATE-04:
states: [Counts]
KSTREAM-LEFTJOIN-05:
states: [Counts]
{code} 

This kind of optimization should be automatic in Streams, which we can consider 
doing when extending from one-operator-at-a-time translation.

  was:
Consider the following DSL:

{code}
Stream source = builder.stream(Serdes.String(), 
Serdes.String(), "topic1").map(..);

KTable counts = source
.groupByKey()
.count("Counts");

KStream sink = source.leftJoin(counts, ..);
{code}

The resulted topology looks like this:

{code}
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [topic1]
children:   [KSTREAM-MAP-01]
KSTREAM-MAP-01:
children:   
[KSTREAM-FILTER-04, KSTREAM-FILTER-07]
KSTREAM-FILTER-04:

[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075691#comment-16075691
 ] 

Matthias J. Sax commented on KAFKA-4936:


No concrete roadmap atm. But if anybody wants to pick it up, just go for it :)

> Allow dynamic routing of output records
> ---
>
> Key: KAFKA-4936
> URL: https://issues.apache.org/jira/browse/KAFKA-4936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>  Labels: needs-kip
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5559) Metrics should throw if two client registers with same ID

2017-07-05 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-5559:
--

 Summary: Metrics should throw if two client registers with same ID
 Key: KAFKA-5559
 URL: https://issues.apache.org/jira/browse/KAFKA-5559
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 0.11.0.0
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


Currently, {{AppInfoParser}} only logs a WARN message when a bean is registered 
with an existing name. However, this should be treated as an error and the 
exception should be rthrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5559) Metrics should throw if two client registers with same ID

2017-07-05 Thread TAO XIAO (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075784#comment-16075784
 ] 

TAO XIAO commented on KAFKA-5559:
-

Does this PR suggest we need to use different {{client.id}} with 
multi-threading in the same JVM? As per my understanding (up to 0.10.0.0) 
{{client.id}} is the one that is used to identify an application when enabling 
quota. If this is case how do we have a model that have multiple consumers in 
the same JVM that share the same quota? 

Also there are many streaming frameworks, e.g. Storm, Flink, bootstrapping 
multiple consumers in the same JVM when working with local mode (especially 
useful for development and debugging purpose), how do we cope with this 
situation as well? 

> Metrics should throw if two client registers with same ID
> -
>
> Key: KAFKA-5559
> URL: https://issues.apache.org/jira/browse/KAFKA-5559
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, {{AppInfoParser}} only logs a WARN message when a bean is 
> registered with an existing name. However, this should be treated as an error 
> and the exception should be rthrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5167) streams task gets stuck after re-balance due to LockException

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075856#comment-16075856
 ] 

ASF GitHub Bot commented on KAFKA-5167:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3449


> streams task gets stuck after re-balance due to LockException
> -
>
> Key: KAFKA-5167
> URL: https://issues.apache.org/jira/browse/KAFKA-5167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>Reporter: Narendra Kumar
>Assignee: Matthias J. Sax
> Fix For: 0.11.0.1, 0.11.1.0
>
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075940#comment-16075940
 ] 

Matthias J. Sax commented on KAFKA-5070:


[~dnagarajan], [~shoyebpathan], [~kchen]

I just review an old comment from above: 
https://issues.apache.org/jira/browse/KAFKA-5070?focusedCommentId=16002228&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16002228

This issue might also occur, if Streams cleans local state directory -- we got 
a similar report at the mailing list recently. Increasing the clean-up interval 
via {{StreamsConfig}} parameter {{state.cleanup.delay.ms}} (default is 10 
minutes, ie, 60) helped to avoid the issue. Can you try this out and report 
back?

We also just merged KAFKA-5167 into {{trunk}} and {{0.11.0}}. It might also be 
worth to try this out to see if it resolves the issue (in case the clean-up 
thing does not work).

> org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the 
> state directory: /opt/rocksdb/pulse10/0_18
> 
>
> Key: KAFKA-5070
> URL: https://issues.apache.org/jira/browse/KAFKA-5070
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux Version
>Reporter: Dhana
>Assignee: Matthias J. Sax
> Attachments: RocksDB_LockStateDirec.7z
>
>
> Notes: we run two instance of consumer in two difference machines/nodes.
> we have 400 partitions. 200  stream threads/consumer, with 2 consumer.
> We perform HA test(on rebalance - shutdown of one of the consumer/broker), we 
> see this happening
> Error:
> 2017-04-05 11:36:09.352 WARN  StreamThread:1184 StreamThread-66 - Could not 
> create task 0_115. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock 
> the state directory: /opt/rocksdb/pulse10/0_115
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5397) streams are not recovering from LockException during rebalancing

2017-07-05 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075943#comment-16075943
 ] 

Matthias J. Sax commented on KAFKA-5397:


[~jozi-k] KAFKA-5167 got merged into {{trunk}} and {{0.11.0}} today. Can you 
try this out and see if it fixes this issue?

> streams are not recovering from LockException during rebalancing
> 
>
> Key: KAFKA-5397
> URL: https://issues.apache.org/jira/browse/KAFKA-5397
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
> Environment: one node setup, confluent kafka broker v3.2.0, 
> kafka-clients 0.11.0.0-SNAPSHOT, 5 threads for kafka-streams
>Reporter: Jozef Koval
>
> Probably continuation of #KAFKA-5167. Portions of log:
> {code}
> 2017-06-07 01:17:52,435 WARN  
> [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-5] StreamTask  
>- task [2_0] Failed offset commits 
> {browser-aggregation-KSTREAM-MAP-39-repartition-0=OffsetAndMetadata{offset=4725597,
>  metadata=''}, 
> browser-aggregation-KSTREAM-MAP-52-repartition-0=OffsetAndMetadata{offset=4968164,
>  metadata=''}, 
> browser-aggregation-KSTREAM-MAP-26-repartition-0=OffsetAndMetadata{offset=2490506,
>  metadata=''}, 
> browser-aggregation-KSTREAM-MAP-65-repartition-0=OffsetAndMetadata{offset=7457795,
>  metadata=''}, 
> browser-aggregation-KSTREAM-MAP-13-repartition-0=OffsetAndMetadata{offset=530888,
>  metadata=''}} due to Commit cannot be completed since the group has already 
> rebalanced and assigned the partitions to another member. This means that the 
> time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time message processing. You can address this either by increasing 
> the session timeout or by reducing the maximum size of batches returned in 
> poll() with max.poll.records.
> 2017-06-07 01:17:52,436 WARN  
> [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-2] StreamTask  
>- task [7_0] Failed offset commits 
> {browser-aggregation-Aggregate-Counts-repartition-0=OffsetAndMetadata{offset=13275085,
>  metadata=''}} due to Commit cannot be completed since the group has already 
> rebalanced and assigned the partitions to another member. This means that the 
> time between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time message processing. You can address this either by increasing 
> the session timeout or by reducing the maximum size of batches returned in 
> poll() with max.poll.records.
> 2017-06-07 01:17:52,488 WARN  
> [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-2] StreamThread
>- stream-thread [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-2] 
> Failed to commit StreamTask 7_0 state: 
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured max.poll.interval.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:798)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:778)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(C

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075945#comment-16075945
 ] 

Yogesh BG commented on KAFKA-5545:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-05 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075945#comment-16075945
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/6/17 5:16 AM:
--

Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data. 



was (Author: yogeshbelur):
Hey

setupDiscovery is scheduled thread, having logic to check the ip's of broker 
has changed or not and then u can see the code i am calling close(), which 
internally calls stream.close();  You can also see the logs that the close has 
been triggered. If not called how shutdowns will be initiated?
<>
_ But from your attached logs it does seems the thread was notified to shutdown 
but never existed the main loop:_

You should check why shutdown didn't happen. why there are some threads still 
alive which were part of the previous stream instance once the close has been 
invoked??? Is there any way i can shutdown the stream completely without 
restarting the app.

BTW restarting application is having its own problem, when i do restart with 
new broker ip threads are hung, never coming back to process the data.


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.

[jira] [Commented] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075952#comment-16075952
 ] 

ASF GitHub Bot commented on KAFKA-5557:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3488


> Using a logPrefix inside the StreamPartitionAssignor
> 
>
> Key: KAFKA-5557
> URL: https://issues.apache.org/jira/browse/KAFKA-5557
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Trivial
> Fix For: 0.11.1.0
>
>
> Hi,
> the "stream-thread [%s]" is replicated more times in all the logging messages 
> inside the StreamPartitionAssignor. Using a logPrefix like for the 
> StreamThread class could be better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4827) Kafka connect: error with special characters in connector name

2017-07-05 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076052#comment-16076052
 ] 

Sönke Liebau commented on KAFKA-4827:
-

There is a few more special characters that cause similar behavior: %, ?, <, >, 
/, \, ...

I have blacklisted the obvious ones in the fix for KAFKA-4930 but think that a 
larger discussion around naming conventions and what is and isn't allowed is 
probably necessary.

> Kafka connect: error with special characters in connector name
> --
>
> Key: KAFKA-4827
> URL: https://issues.apache.org/jira/browse/KAFKA-4827
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.1.0
>Reporter: Aymeric Bouvet
>Priority: Minor
>
> When creating a connector, if the connector name (and possibly other 
> properties) end with a carriage return, kafka-connect will create the config 
> but report error
> {code}
> cat << EOF > file-connector.json
> {
>   "name": "file-connector\r",
>   "config": {
> "topic": "kafka-connect-logs\r",
> "tasks.max": "1",
> "file": "/var/log/ansible-confluent/connect.log",
> "connector.class": 
> "org.apache.kafka.connect.file.FileStreamSourceConnector"
>   }
> }
> EOF
> curl -X POST -H "Content-Type: application/json" -H "Accept: 
> application/json" -d @file-connector.json localhost:8083/connectors 
> {code}
> returns an error 500  and log the following
> {code}
> [2017-03-01 18:25:23,895] WARN  (org.eclipse.jetty.servlet.ServletHandler)
> javax.servlet.ServletException: java.lang.IllegalArgumentException: Illegal 
> character in path at index 27: /connectors/file-connector4
> at 
> org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
> at 
> org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
> at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
> at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
> at 
> org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
> at 
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
> at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
> at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
> at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
> at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
> at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
> at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
> at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
> at 
> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
> at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> at 
> org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
> at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> at org.eclipse.jetty.server.Server.handle(Server.java:499)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
> at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
> at 
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
> at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
> at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: Illegal character in path at 
> index 27: /connectors/file-connector4
> at java.net.URI.create(URI.java:852)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:100)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
> at 
> org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
> at 
>