[GitHub] kafka-site pull request #16: Update design.html

2016-07-07 Thread nihed
Github user nihed closed the pull request at:

https://github.com/apache/kafka-site/pull/16


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1067) the default partitioner should be randomizing messages and a new partition for the meta refresh requirements created

2016-07-07 Thread Asaf Mesika (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365726#comment-15365726
 ] 

Asaf Mesika commented on KAFKA-1067:


After hitting this issue in production and searching for a long time until 
finding this FAQ I'd recommended documenting it where you first search (at 
least me) - in the code. Have the javadoc of the default partitioner document 
this weird behavior.
Next would be to document it in the old consumer docs in the site.
I only got to this FAQ through someone who knew about this issue, in the local 
Java forum (Java.IL).


> the default partitioner should be randomizing messages and a new partition 
> for the meta refresh requirements created
> 
>
> Key: KAFKA-1067
> URL: https://issues.apache.org/jira/browse/KAFKA-1067
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
> Fix For: 0.8.2.0
>
>
> Details behind this 
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Pros and cons of dockerizing kafka brokers?

2016-07-07 Thread Krish
Hi,
I am currently testing a custom docker volume driver plugin for AWS EFS/EBS
access and mounting. So, running kafka broker inside a container makes will
ease up a lot of configuration issues wrt storage for me.

Are there any pros and cons of dockerizing kafka broker?
Off the top of my head, since kafka forms the base of our setup, I can
think of making is use the host networking stack, and increase ulimits for
the container.
I would like to know if and when kafka becomes greedy and cannibalizes
resources; I can also ensure that it runs on a dedicated machine.

Thanks.

Best,
Krish


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-07 Thread Damian Guy
Thanks Henry - we've updated the KIP with an example and the new config
parameter required. FWIW the user doesn't register a listener, they provide
a host:port in config. It is expected they will start a service running on
that host:port that they can use to connect to the running KafkaStreams
Instance.

Thanks,
Damian

On Thu, 7 Jul 2016 at 06:06 Henry Cai  wrote:

> It wasn't quite clear to me how the user program interacts with the
> discovery API, especially on the user supplied listener part, how does the
> user program supply that listener to KafkaStreams and how does KafkaStreams
> know which port the user listener is running, maybe a more complete
> end-to-end example including the steps on registering the user listener and
> whether the user listener needs to be involved with task reassignment.
>
>
> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang  wrote:
>
> > +1
> >
> > On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to initiate the voting process for KIP-67
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > >
> > >
> > > KAFKA-3909  is the
> top
> > > level JIRA for this effort.
> > >
> > > Initial PRs for Step 1 of the process are:
> > > Expose State Store Names 
> and
> > > Query Local State Stores 
> > >
> > > Thanks,
> > > Damian
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


__consumer_offsets leader is -1

2016-07-07 Thread Kafka
Hi,  __consumer_offsets ’s partition 7 and partition 27  leader is -1, and isr 
is null,who can tell me how to recover it,thank you.

Topic: __consumer_offsets   Partition: 0Leader: 3   Replicas: 3,4,5 
Isr: 4,5,3
Topic: __consumer_offsets   Partition: 1Leader: 4   
Replicas: 4,5,0 Isr: 0,4,5
Topic: __consumer_offsets   Partition: 2Leader: 5   
Replicas: 5,0,1 Isr: 0,5,1
Topic: __consumer_offsets   Partition: 3Leader: 0   
Replicas: 0,1,3 Isr: 0,1,3
Topic: __consumer_offsets   Partition: 4Leader: 1   
Replicas: 1,3,4 Isr: 4,1,3
Topic: __consumer_offsets   Partition: 5Leader: 3   
Replicas: 3,5,0 Isr: 5,3,0
Topic: __consumer_offsets   Partition: 6Leader: 4   
Replicas: 4,0,1 Isr: 0,4,1
Topic: __consumer_offsets   Partition: 7Leader: -1  
Replicas: 5,1,3 Isr:
Topic: __consumer_offsets   Partition: 8Leader: 0   
Replicas: 0,3,4 Isr: 0,4,3
Topic: __consumer_offsets   Partition: 9Leader: 1   
Replicas: 1,4,5 Isr: 4,5,1
Topic: __consumer_offsets   Partition: 10   Leader: 3   
Replicas: 3,0,1 Isr: 0,1,3
Topic: __consumer_offsets   Partition: 11   Leader: 4   
Replicas: 4,1,3 Isr: 4,1,3
Topic: __consumer_offsets   Partition: 12   Leader: 5   
Replicas: 5,3,4 Isr: 4,5,3
Topic: __consumer_offsets   Partition: 13   Leader: 0   
Replicas: 0,4,5 Isr: 0,4,5
Topic: __consumer_offsets   Partition: 14   Leader: 1   
Replicas: 1,5,0 Isr: 0,5,1
Topic: __consumer_offsets   Partition: 15   Leader: 3   
Replicas: 3,1,4 Isr: 1,4,3
Topic: __consumer_offsets   Partition: 16   Leader: 4   
Replicas: 4,3,5 Isr: 4,5,3
Topic: __consumer_offsets   Partition: 17   Leader: 5   
Replicas: 5,4,0 Isr: 0,4,5
Topic: __consumer_offsets   Partition: 18   Leader: 0   
Replicas: 0,5,1 Isr: 0,5,1
Topic: __consumer_offsets   Partition: 19   Leader: 1   
Replicas: 1,0,3 Isr: 0,1,3
Topic: __consumer_offsets   Partition: 20   Leader: 3   
Replicas: 3,4,5 Isr: 4,5,3
Topic: __consumer_offsets   Partition: 21   Leader: 4   
Replicas: 4,5,0 Isr: 0,4,5
Topic: __consumer_offsets   Partition: 22   Leader: 5   
Replicas: 5,0,1 Isr: 0,5,1
Topic: __consumer_offsets   Partition: 23   Leader: 0   
Replicas: 0,1,3 Isr: 0,1,3
Topic: __consumer_offsets   Partition: 24   Leader: 1   
Replicas: 1,3,4 Isr: 4,1,3
Topic: __consumer_offsets   Partition: 25   Leader: 3   
Replicas: 3,5,0 Isr: 5,3,0
Topic: __consumer_offsets   Partition: 26   Leader: 4   
Replicas: 4,0,1 Isr: 0,4,1
Topic: __consumer_offsets   Partition: 27   Leader: -1  
Replicas: 5,1,3 Isr:


[jira] [Created] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-3933:
---

 Summary: Kafka OOM During Log Recovery Due to Leaked Native Memory
 Key: KAFKA-3933
 URL: https://issues.apache.org/jira/browse/KAFKA-3933
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2
 Environment: Linux, latest oracle java-8
Reporter: Tom Crayford


Hi there. We've been tracking an issue where Kafka hits an 
java.lang.OutOfMemoryError during log recovery.
After a bunch of tracking work, we've realized we've hit an instance of a long 
known issue: http://www.evanjones.ca/java-native-leak-bug.html

TLDR: Kafka breaks the rule "Always close GZIPInputStream and GZIPOutputStream 
since they use native memory via zlib" from that article.

As such, during broker startup, when you're recovering log segments that have 
been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
Our crashes during startup have this profile - the JVM heap is empty (a few 
hundred MB), but the offheap memory is full of allocations caused by 
`Java_java_util_zip_Deflater_init` and `deflatInit2`.
This leads to broker crashes during startup. The only real mitigation is having 
*far* more memory than you need to boot (which I'd guess is why folk haven't 
noticed this in production that much yet).

To dig into the code more (this is based on trunk). Log recovery on unflushed 
segments eventually calls `LogSegment.recover`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172

On compressed segments, that leads to a call to `deepIterator`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189

That leads to a call to `CompressionFactory`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
 which creates a `GZIPInputStream`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46

That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
means that the finalizer on `GZIPInputStream` that deallocates the native 
buffers is never called, because GC is never triggered. Instead, we just 
exhaust the offheap memory and then Kafka dies from an OutOfMemory error.

Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
reading the whole input stream (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
 When it's performing log recovery, in 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
 it doesn't read to the end of the stream, but instead reads the first offset 
and leaves things alone.

This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
same way. I think (but haven't 100% verified) that it impacts all versions of 
Kafka that are supported (0.8 -> 0.10).

Fixing this seems relatively annoying, but only because of some "small matters 
of coding", nothing hugely problematic.

The main issue is that `deepIterator` only returns an `Iterator`, which doesn't 
have a `close()` method of any kind. We could create a new `ClosableIterator` 
trait and have it extend Java's `AutoCloseable` 
(https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), then 
explicitly call `close()` everywhere we use a `deepIterator()` and don't always 
read to the end. Scala unfortunately doesn't seem to have a built in version of 
Java's `try-with-resources` statement, but we can explicitly call close 
everywhere perfectly happily.

Another (but much more hacky) solution would be to always read to the end of 
the iterator in `LogSegment.recover`, but that seems pretty bad, using far more 
resources than is needed during recovery.

I can't think of any other reasonable solutions for now, but would love to hear 
input from the community.

We're happy doing the work of developing a patch, but thought we'd report the 
issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1421: KAFKA-3680: Enable Kafka clients to run in any cla...

2016-07-07 Thread rajinisivaram
GitHub user rajinisivaram reopened a pull request:

https://github.com/apache/kafka/pull/1421

KAFKA-3680: Enable Kafka clients to run in any classloader env

Configure default classes using class objects instead of class names, 
enable configurable lists of classes to be specified as class objects, add 
tests for different classloader configurations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-3680

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1421


commit 364d42f5a2297cd75151d0f4d83f48a41b817f3f
Author: Rajini Sivaram 
Date:   2016-05-24T09:52:48Z

KAFKA-3680: Enable Kafka clients to run in any classloader env




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1421: KAFKA-3680: Enable Kafka clients to run in any cla...

2016-07-07 Thread rajinisivaram
Github user rajinisivaram closed the pull request at:

https://github.com/apache/kafka/pull/1421


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3680) Make Java client classloading more flexible

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366109#comment-15366109
 ] 

ASF GitHub Bot commented on KAFKA-3680:
---

GitHub user rajinisivaram reopened a pull request:

https://github.com/apache/kafka/pull/1421

KAFKA-3680: Enable Kafka clients to run in any classloader env

Configure default classes using class objects instead of class names, 
enable configurable lists of classes to be specified as class objects, add 
tests for different classloader configurations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-3680

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1421


commit 364d42f5a2297cd75151d0f4d83f48a41b817f3f
Author: Rajini Sivaram 
Date:   2016-05-24T09:52:48Z

KAFKA-3680: Enable Kafka clients to run in any classloader env




> Make Java client classloading more flexible
> ---
>
> Key: KAFKA-3680
> URL: https://issues.apache.org/jira/browse/KAFKA-3680
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> JIRA corresponding to 
> [KIP-60|https://cwiki.apache.org/confluence/display/KAFKA/KIP-60+-+Make+Java+client+classloading+more+flexible]
>  to enable classloading of default classes and custom classes to work in 
> different classloading environments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3680) Make Java client classloading more flexible

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366108#comment-15366108
 ] 

ASF GitHub Bot commented on KAFKA-3680:
---

Github user rajinisivaram closed the pull request at:

https://github.com/apache/kafka/pull/1421


> Make Java client classloading more flexible
> ---
>
> Key: KAFKA-3680
> URL: https://issues.apache.org/jira/browse/KAFKA-3680
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> JIRA corresponding to 
> [KIP-60|https://cwiki.apache.org/confluence/display/KAFKA/KIP-60+-+Make+Java+client+classloading+more+flexible]
>  to enable classloading of default classes and custom classes to work in 
> different classloading environments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3680) Make Java client classloading more flexible

2016-07-07 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3680:
--
Fix Version/s: 0.10.1.0
   Status: Patch Available  (was: Open)

> Make Java client classloading more flexible
> ---
>
> Key: KAFKA-3680
> URL: https://issues.apache.org/jira/browse/KAFKA-3680
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.10.1.0
>
>
> JIRA corresponding to 
> [KIP-60|https://cwiki.apache.org/confluence/display/KAFKA/KIP-60+-+Make+Java+client+classloading+more+flexible]
>  to enable classloading of default classes and custom classes to work in 
> different classloading environments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-07 Thread Michael Noll
+1 (non-binding)

On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy  wrote:

> Thanks Henry - we've updated the KIP with an example and the new config
> parameter required. FWIW the user doesn't register a listener, they provide
> a host:port in config. It is expected they will start a service running on
> that host:port that they can use to connect to the running KafkaStreams
> Instance.
>
> Thanks,
> Damian
>
> On Thu, 7 Jul 2016 at 06:06 Henry Cai  wrote:
>
> > It wasn't quite clear to me how the user program interacts with the
> > discovery API, especially on the user supplied listener part, how does
> the
> > user program supply that listener to KafkaStreams and how does
> KafkaStreams
> > know which port the user listener is running, maybe a more complete
> > end-to-end example including the steps on registering the user listener
> and
> > whether the user listener needs to be involved with task reassignment.
> >
> >
> > On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang 
> wrote:
> >
> > > +1
> > >
> > > On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to initiate the voting process for KIP-67
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > >
> > > >
> > > > KAFKA-3909  is the
> > top
> > > > level JIRA for this effort.
> > > >
> > > > Initial PRs for Step 1 of the process are:
> > > > Expose State Store Names 
> > and
> > > > Query Local State Stores 
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
Best regards,
Michael Noll



*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-07 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366124#comment-15366124
 ] 

Eno Thereska commented on KAFKA-3101:
-

[~bbejeck] Sounds great. So the KIP is KIP-63 and the main JIRA has now moved 
to KAFKA-3776. There are a few subtasks under that that you can pick up as you 
go. Thanks!

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-07 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366148#comment-15366148
 ] 

Bill Bejeck commented on KAFKA-3101:


[~enothereska] ok I get it now, KAFKA-3101 is being replaced/superseded by 
KAFKA-3776. Thanks for the heads up. 

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[VOTE] KIP-55: Secure quotas for authenticated users

2016-07-07 Thread Rajini Sivaram
I would like to initiate voting for KIP-55 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users).
Since the KIP has changed quite a lot since the last vote, we will discard
the previous vote and start this new voting thread.

KIP-55 extends the existing client-id quota implementation to enable secure
quotas for multi-user environments. The KIP proposes a flexible, unified
design that supports quotas at ,  or 
levels. It retains compatibility with the existing  quotas when
new user level quotas are not configured.

Thank you...


Regards,

Rajini


[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366174#comment-15366174
 ] 

Ismael Juma commented on KAFKA-3933:


Nice catch. The suggested solution sounds good to me, but I think the 
`CloseableIterator` should probably live in `common` as `RecordsIterator` 
probably has a similar issue?

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3933:
---
Priority: Critical  (was: Major)

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3933:
---
Fix Version/s: 0.10.0.1

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-07 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366230#comment-15366230
 ] 

Eno Thereska commented on KAFKA-3101:
-

Yup. Thanks.

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2016-07-07 Thread Chris Rodier (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366237#comment-15366237
 ] 

Chris Rodier commented on KAFKA-2729:
-

We also observed this identical issue, on 0.9.0.1 today.  Restart of the failed 
broker resolved the issue without difficulty as a work around.  This seems like 
a high priority issue where you could lose nodes, and/or lose a cluster fairly 
easily due to zookeeper instability / elections.



> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3934) kafka-server-start.sh enables GC by default with no way to disable

2016-07-07 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-3934:
--

 Summary: kafka-server-start.sh enables GC by default with no way 
to disable
 Key: KAFKA-3934
 URL: https://issues.apache.org/jira/browse/KAFKA-3934
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke


In KAFKA-1127 the following line was added to kafka-server-start.sh:

{noformat}
EXTRA_ARGS="-name kafkaServer -loggc"
{noformat}

This prevents gc logging from being disabled without some unusual environment 
variable workarounds. 

I suggest EXTRA_ARGS is made overridable like below: 

{noformat}
if [ "x$EXTRA_ARGS" = "x" ]; then
export EXTRA_ARGS="-name kafkaServer -loggc"
fi
{noformat}

*Note:* I am also not sure I understand why the existing code uses the "x" 
thing when checking the variable instead of the following:

{noformat}
export EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
{noformat}

This lets the variable be overridden to "" without taking the default. 

*Workaround:* As a workaround the user should be able to set $KAFKA_GC_LOG_OPTS 
to fit their needs. Since kafka-run-class.sh will not ignore the -loggc 
parameter if that is set. 

{noformat}
-loggc)
  if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
GC_LOG_ENABLED="true"
  fi
  shift
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


join in the dev group

2016-07-07 Thread caizhiqi...@kingsoft.com

hello,
 i wanna join and learn more kafka. thanks so much .




caizhiqi...@kingsoft.com


Using KIP Process in Apache Flink

2016-07-07 Thread Aljoscha Krettek
Hi Kafka Community,
I'm a member of the Flink community and I would like to ask if you're okay
with us using your KIP process for Flink. We are at a stage where we have
to be more careful in planning big feature additions to Flink and are
looking for ways to formalize our processes there. I feel we don't have to
reinvent the wheel if other projects already put thought into this and
established a process that works.

So, would you be okay with us copying the structures and docs that you have
in place for the KIP process? With adaptions to suit Flink, of course.

Best,
Aljoscha


Re: Using KIP Process in Apache Flink

2016-07-07 Thread Ismael Juma
Hi Aljoscha,

Thanks for sharing your intention to use a process similar to our KIP
process. You are more than welcome to copy the structures and docs that we
have for the KIP process. :)

Ismael

On Thu, Jul 7, 2016 at 4:16 PM, Aljoscha Krettek 
wrote:

> Hi Kafka Community,
> I'm a member of the Flink community and I would like to ask if you're okay
> with us using your KIP process for Flink. We are at a stage where we have
> to be more careful in planning big feature additions to Flink and are
> looking for ways to formalize our processes there. I feel we don't have to
> reinvent the wheel if other projects already put thought into this and
> established a process that works.
>
> So, would you be okay with us copying the structures and docs that you have
> in place for the KIP process? With adaptions to suit Flink, of course.
>
> Best,
> Aljoscha
>


Re: join in the dev group

2016-07-07 Thread Matthias J. Sax
Hi,

if you want to subscribe to dev list, you need to send an email to
dev-subscr...@kafka.apache.org

See: https://kafka.apache.org/contact.html


-Matthias

On 07/07/2016 06:04 AM, caizhiqi...@kingsoft.com wrote:
> 
> hello,
>  i wanna join and learn more kafka. thanks so much .
> 
> 
> 
> 
> caizhiqi...@kingsoft.com
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366329#comment-15366329
 ] 

Tom Crayford commented on KAFKA-3933:
-

That makes sense to me. Thanks for the pointer.

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts

2016-07-07 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366339#comment-15366339
 ] 

Jun Rao commented on KAFKA-3894:


[~wushujames], this is slightly different from KAFKA-3810. In KAFKA-3810, 
messages are bounded by MaxMessageSize, which in turn bounds the fetch response 
size. For cleaning, if messages are uncompressed, the dedupBufferSize needed is 
bounded by segmentSize/perMessageOverhead. However, if messages are compressed, 
dedupBufferSize needed could be arbitrarily large. So, I am not sure if we want 
to auto grow the buffer size arbitrarily. 

#4 seems to be a safer approach. There are effective ways of estimating the 
number of unique keys 
(https://people.mpi-inf.mpg.de/~rgemulla/publications/beyer07distinct.pdf) 
incrementally. We will need to figure out where to store it in order to avoid 
rescanning the log on startup. 

> Log Cleaner thread crashes and never restarts
> -
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>  Labels: compaction
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-07-07 Thread Rajneesh Mitharwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366378#comment-15366378
 ] 

Rajneesh Mitharwal commented on KAFKA-2985:
---

I am also facing this problem. Broker 0.9.01 and java client 0.9.01

```
Thu Jul 07 16:32:03,780 2016 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:? ERROR 
pool-3-thread-9 Error UNKNOWN_MEMBER_ID occurred while committing offsets for 
group test-consumer
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed due to group rebalance
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 ~[kafka-clients-0.9.0.1.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
 ~[kafka-clients-0.9.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) 
~[kafka-clients-0.9.0.1.jar:na]
```

1. To resolve this issue I put retries on commitSync and poll more data even if 
previous batch commitSync failed. but it is not desirable since it will result 
into duplicate processing. 
2. increased heartbeat and session timeouts.
3. decreased `max.partition.fetch.bytes` but I can't set it value to low also 
as it will result into `large message detected` error.  
4. `max.poll.records` setting is not applicable for me. and my records size are 
not uniform. some are like 1-2 kbs while some are ~500Kbs. And processing for 
records is not guaranteed to be completed in small session time out like 30 
seconds if say 1000 records received of size 0.5 kb while  
`max.partition.fetch.bytes=50` 

 

 

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> 

Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-07 Thread Henry Cai
+1

On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll  wrote:

> +1 (non-binding)
>
> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy  wrote:
>
> > Thanks Henry - we've updated the KIP with an example and the new config
> > parameter required. FWIW the user doesn't register a listener, they
> provide
> > a host:port in config. It is expected they will start a service running
> on
> > that host:port that they can use to connect to the running KafkaStreams
> > Instance.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 7 Jul 2016 at 06:06 Henry Cai 
> wrote:
> >
> > > It wasn't quite clear to me how the user program interacts with the
> > > discovery API, especially on the user supplied listener part, how does
> > the
> > > user program supply that listener to KafkaStreams and how does
> > KafkaStreams
> > > know which port the user listener is running, maybe a more complete
> > > end-to-end example including the steps on registering the user listener
> > and
> > > whether the user listener needs to be involved with task reassignment.
> > >
> > >
> > > On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang 
> > wrote:
> > >
> > > > +1
> > > >
> > > > On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I'd like to initiate the voting process for KIP-67
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > > >
> > > > >
> > > > > KAFKA-3909  is
> the
> > > top
> > > > > level JIRA for this effort.
> > > > >
> > > > > Initial PRs for Step 1 of the process are:
> > > > > Expose State Store Names <
> https://github.com/apache/kafka/pull/1526>
> > > and
> > > > > Query Local State Stores <
> https://github.com/apache/kafka/pull/1565>
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> Best regards,
> Michael Noll
>
>
>
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> Apache Kafka and Confluent Platform: www.confluent.io/download
> *
>


[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2016-07-07 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366402#comment-15366402
 ] 

Vahid Hashemian commented on KAFKA-3932:


Can what you suggest be implemented with consumer's {{seek(TopicPartition 
partition, long offset)}} method on the client side?

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-07 Thread Guozhang Wang
Hello,

The problem is that, in stream processing when you have a new record coming
from the big table tB and does not find a match with the small table (i.e.
only a sub set of keys in practice) in tA, you cannot tell which case of
the following is true:

1. there was a matching record in tA, which gets deleted before this tB
record arrives. In this case we need to output a null record indicating to
"negate" the previous join result.
2. there was never a matching record in tA. In this case we actually do not
need to output anything.

As I mentioned, the root cause is that tables in stream processing can be
updated while it is on-going. As for Kafka Streams implementation, the if a
null record is received in the downstream operators while there is no
existing record for that key in the materialized view, then it is treated
as a no-op.


Guozhang


On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome  wrote:

> thanks, I understand that it's not modelled the same way as database joins.
>
> Let's take an example of inner join of very small population set (NBA
> rookies or US senators) with larger table (data on zip codes). Let's assume
> we want to identify the crime rate of zip codes where current senators live
> or the median income of zip codes where NBA rookies currently live. These
> small elite population samples will likely never live in the 50% poorest
> zip codes in US (although exceptionally some might) and NBA rookies will
> not live far from their team home base (Maine, Alaska, Hawaii, North
> Dakota) so many zip codes will not match and are expected to never match.
> So, I don't see that the keys representing such zip codes will become
> eventually consistent.
>
> One can imagine an application that makes case of census data (with many
> zip codes) and interested in many such statistics for several such small
> "elite" populations and then the irrelevant zip codes with null records
> find their ways multiple times in the data pipeline.
>
> I cannot think of a more egregious example where one table has billions of
> keys and the other only a handful that would match but I'd assume that such
> use cases could be natural.
>
> It seems to me that the null keys should be output to represent a record
> deletion in the resulting table, but not a near miss on data selection.
>
> On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang  wrote:
>
> > Hello,
> >
> > The KTable join semantics is not exactly the same with that of a RDBMS.
> You
> > can fine detailed semantics in the web docs (search for Joining Streams):
> >
> >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
> >
> > In a nutshell, the joiner will be triggered only if both / left / either
> of
> > the joining streams has the matching record with the key of the incoming
> > received record (so the input values of the joiner could not be null /
> can
> > be null for only the other value / can be null on either values, but not
> > both), and otherwise a pair of {join-key, null} is output. We made this
> > design deliberately just to make sure that "table-table joins are
> > eventually consistent". This gives a kind of resilience to late arrival
> of
> > records that a late arrival in either stream can "update" the join
> result.
> >
> >
> > Guozhang
> >
> > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome 
> > wrote:
> >
> > > Same happens for regular join, keys that appear only in one stream will
> > > make it to output KTable tC with a null for either input stream. I
> guess
> > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella
> > JIRA
> > > 3909, Queryable state for Kafka Streams?
> > >
> > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome 
> > > wrote:
> > >
> > > > If we have two streams A and B for which we associate tables tA and
> tB,
> > > > then create a table tC as ta.leftJoin(tB, ) and
> then
> > > we
> > > > have a key kB in stream B but never made it to tA nor tC, do we need
> to
> > > > inject a pair (k,v) of (kB, null) into resulting change log for tC ?
> > > >
> > > > It sounds like it is definitely necessary if key kB is present in
> table
> > > tC
> > > > but if not, why add it?
> > > >
> > > > I have an example that reproduces this and would like to know if it
> is
> > > > considered normal, sub-optimal, or a defect. I don't view it as
> normal
> > > for
> > > > time being, particularly considering stream A as having very few keys
> > > and B
> > > > as having many, which could lead to an unnecessary large change log
> for
> > > C.
> > > >
> > > > Phil
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366418#comment-15366418
 ] 

Guozhang Wang commented on KAFKA-3101:
--

[~bbejeck] Regarding KAFKA-3776, there is still one open question about 
caching, whether we should store objects or bytes, which is related to memory 
management. You can find the details about it in the linked discussion thread:

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams#Discussion:MemoryManagementinKafkaStreams-OpenQuestions

So it will be great if you could first do some experiment on performance 
comparison with object size estimation (e.g. https://github.com/jbellis/jamm) 
v.s. serde cost to in order to make this decision first.

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-67: Queryable state for Kafka Streams

2016-07-07 Thread Sriram Subramanian
+1

On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai 
wrote:

> +1
>
> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll  wrote:
>
> > +1 (non-binding)
> >
> > On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy 
> wrote:
> >
> > > Thanks Henry - we've updated the KIP with an example and the new config
> > > parameter required. FWIW the user doesn't register a listener, they
> > provide
> > > a host:port in config. It is expected they will start a service running
> > on
> > > that host:port that they can use to connect to the running KafkaStreams
> > > Instance.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 7 Jul 2016 at 06:06 Henry Cai 
> > wrote:
> > >
> > > > It wasn't quite clear to me how the user program interacts with the
> > > > discovery API, especially on the user supplied listener part, how
> does
> > > the
> > > > user program supply that listener to KafkaStreams and how does
> > > KafkaStreams
> > > > know which port the user listener is running, maybe a more complete
> > > > end-to-end example including the steps on registering the user
> listener
> > > and
> > > > whether the user listener needs to be involved with task
> reassignment.
> > > >
> > > >
> > > > On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > +1
> > > > >
> > > > > On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy 
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I'd like to initiate the voting process for KIP-67
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > > > >
> > > > > >
> > > > > > KAFKA-3909  is
> > the
> > > > top
> > > > > > level JIRA for this effort.
> > > > > >
> > > > > > Initial PRs for Step 1 of the process are:
> > > > > > Expose State Store Names <
> > https://github.com/apache/kafka/pull/1526>
> > > > and
> > > > > > Query Local State Stores <
> > https://github.com/apache/kafka/pull/1565>
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Best regards,
> > Michael Noll
> >
> >
> >
> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> > Apache Kafka and Confluent Platform: www.confluent.io/download
> > *
> >
>


Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-07 Thread Philippe Derome
Oh ok, I had assumed that distinguishing cases 1 and 2 was possible.

Thanks.
On 7 Jul 2016 1:02 p.m., "Guozhang Wang"  wrote:

> Hello,
>
> The problem is that, in stream processing when you have a new record coming
> from the big table tB and does not find a match with the small table (i.e.
> only a sub set of keys in practice) in tA, you cannot tell which case of
> the following is true:
>
> 1. there was a matching record in tA, which gets deleted before this tB
> record arrives. In this case we need to output a null record indicating to
> "negate" the previous join result.
> 2. there was never a matching record in tA. In this case we actually do not
> need to output anything.
>
> As I mentioned, the root cause is that tables in stream processing can be
> updated while it is on-going. As for Kafka Streams implementation, the if a
> null record is received in the downstream operators while there is no
> existing record for that key in the materialized view, then it is treated
> as a no-op.
>
>
> Guozhang
>
>
> On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome 
> wrote:
>
> > thanks, I understand that it's not modelled the same way as database
> joins.
> >
> > Let's take an example of inner join of very small population set (NBA
> > rookies or US senators) with larger table (data on zip codes). Let's
> assume
> > we want to identify the crime rate of zip codes where current senators
> live
> > or the median income of zip codes where NBA rookies currently live. These
> > small elite population samples will likely never live in the 50% poorest
> > zip codes in US (although exceptionally some might) and NBA rookies will
> > not live far from their team home base (Maine, Alaska, Hawaii, North
> > Dakota) so many zip codes will not match and are expected to never match.
> > So, I don't see that the keys representing such zip codes will become
> > eventually consistent.
> >
> > One can imagine an application that makes case of census data (with many
> > zip codes) and interested in many such statistics for several such small
> > "elite" populations and then the irrelevant zip codes with null records
> > find their ways multiple times in the data pipeline.
> >
> > I cannot think of a more egregious example where one table has billions
> of
> > keys and the other only a handful that would match but I'd assume that
> such
> > use cases could be natural.
> >
> > It seems to me that the null keys should be output to represent a record
> > deletion in the resulting table, but not a near miss on data selection.
> >
> > On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang 
> wrote:
> >
> > > Hello,
> > >
> > > The KTable join semantics is not exactly the same with that of a RDBMS.
> > You
> > > can fine detailed semantics in the web docs (search for Joining
> Streams):
> > >
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
> > >
> > > In a nutshell, the joiner will be triggered only if both / left /
> either
> > of
> > > the joining streams has the matching record with the key of the
> incoming
> > > received record (so the input values of the joiner could not be null /
> > can
> > > be null for only the other value / can be null on either values, but
> not
> > > both), and otherwise a pair of {join-key, null} is output. We made this
> > > design deliberately just to make sure that "table-table joins are
> > > eventually consistent". This gives a kind of resilience to late arrival
> > of
> > > records that a late arrival in either stream can "update" the join
> > result.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome 
> > > wrote:
> > >
> > > > Same happens for regular join, keys that appear only in one stream
> will
> > > > make it to output KTable tC with a null for either input stream. I
> > guess
> > > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella
> > > JIRA
> > > > 3909, Queryable state for Kafka Streams?
> > > >
> > > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome 
> > > > wrote:
> > > >
> > > > > If we have two streams A and B for which we associate tables tA and
> > tB,
> > > > > then create a table tC as ta.leftJoin(tB, ) and
> > then
> > > > we
> > > > > have a key kB in stream B but never made it to tA nor tC, do we
> need
> > to
> > > > > inject a pair (k,v) of (kB, null) into resulting change log for tC
> ?
> > > > >
> > > > > It sounds like it is definitely necessary if key kB is present in
> > table
> > > > tC
> > > > > but if not, why add it?
> > > > >
> > > > > I have an example that reproduces this and would like to know if it
> > is
> > > > > considered normal, sub-optimal, or a defect. I don't view it as
> > normal
> > > > for
> > > > > time being, particularly considering stream A as having very few
> keys
> > > > and B
> > > > > as having many, which could lead to an unnecessary large change log
> > for
> > > > C.
> > > > >
> > > > > Phil
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > --

[jira] [Created] (KAFKA-3935) ConnectDistributedTest.test_restart_failed_task.connector_type=sink system test failing

2016-07-07 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-3935:


 Summary: 
ConnectDistributedTest.test_restart_failed_task.connector_type=sink system test 
failing
 Key: KAFKA-3935
 URL: https://issues.apache.org/jira/browse/KAFKA-3935
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Ewen Cheslack-Postava
Assignee: Jason Gustafson


This has failed a few times, see e.g. 
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-07-07--001.1467911236--apache--trunk--efc4c88/report.html
 Note that it is *only* the sink task version, the source task one works ok.

{code}

test_id:
2016-07-06--001.kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_restart_failed_task.connector_type=sink
status: FAIL
run time:   1 minute 10.991 seconds


Failed to see task transition to the FAILED state
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
 line 106, in run_all_tests
data = self.run_single_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
 line 162, in run_single_test
return self.current_test_context.function(self.current_test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py",
 line 331, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
 line 175, in test_restart_failed_task
err_msg="Failed to see task transition to the FAILED state")
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape/utils/util.py",
 line 36, in wait_until
raise TimeoutError(err_msg)
TimeoutError: Failed to see task transition to the FAILED state
{code}

I checked the worker logs and it does look like we're seeing the exception:

{code}
[2016-07-06 15:22:19,061] ERROR Task mock-sink-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.RuntimeException
at org.apache.kafka.connect.tools.MockSinkTask.put(MockSinkTask.java:58)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:228)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
[2016-07-06 15:22:19,062] ERROR Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-07-06 15:22:19,062] INFO WorkerSinkTask{id=mock-sink-0} Committing 
offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-07-06 15:22:19,065] DEBUG Group connect-mock-sink committed offset 0 for 
partition test-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2016-07-06 15:22:19,065] DEBUG Finished WorkerSinkTask{id=mock-sink-0} offset 
commit successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-07-06 15:22:19,065] ERROR Task mock-sink-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to 
unrecoverable exception.
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:228)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurren

[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366526#comment-15366526
 ] 

Ismael Juma commented on KAFKA-3933:


Shall I assign the JIRA to you?

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Client Side Auto Topic Creation

2016-07-07 Thread Guozhang Wang
I agree with Grant and Ismael regarding 1 / 2 / 3, and for 2) I would
prefer having the default configs on the client side (i.e. like what we did
in the kafka-topics script today) than on the cluster side, where the
AdminClient will auto-set the configs if they are not specified by the user
triggering its function, so that:

a) when you use the AdminClient in a command line tool / etc, you would
more likely to be aware of the default setting you are living with even
when you do not care.

b) when you use the AdminClient programmably in your client development,
you are more likely to think of the configs to choose with the exposed APIs.


Guozhang

On Fri, Jul 1, 2016 at 4:09 AM, Ismael Juma  wrote:

> Hi all,
>
> I think there are a few things being discussed and it would be good to make
> that explicit:
>
> 1. If and how we expose auto-topic creation in the client (under the
> assumption that the server auto-topic creation will be deprecated and
> eventually removed)
> 2. The ability to create topics with the cluster defaults for replication
> factor and partition counts
> 3. Support for topic "specs"
> 4. The fact that some exceptions are retriable in some cases, but not
> others
>
> My thoughts on each:
>
> 1. I prefer the approach where we throw an exception and let the clients
> create the topic via `AdminClient` if that's what they need.
> 2. Like Grant, I'm unsure that will generally be used in a positive way.
> However, if this is what we need to be able to deprecate server auto-topic
> creation, the benefits outweigh the costs in my opinion.
> 3. Something like this would be good to have and could potentially provide
> a better solution than 2. However, it needs a separate KIP and may take a
> while for the final design to be agreed. So, it should not prevent progress
> from being made in my opinion.
> 4. This has come up before. Encoding whether an exception is retriable or
> not via inheritance is a bit restrictive. Also, something that should be
> discussed separately, probably.
>
> Ismael
>
> On Wed, Jun 29, 2016 at 10:37 PM, Grant Henke  wrote:
>
> > Hi Roger and Constantine,
> >
> > Thanks for the feedback.
> >
> > I agree that configuration to maintain guarantees is commonly spread
> across
> > enterprise teams, making it difficult to get right. That said its also
> hard
> > to solve for every company structure too. I think there is room for an
> open
> > discussion about what configs should be able to be
> > validated/enforced/overridden and where configurations should live. I
> think
> > thats big enough for a whole new KIP and would like to push that
> discussion
> > out until that KIP is opened. These discussions would also make sense in
> > KIP-37
> > - Add Namespaces to Kafka
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-37+-+Add+Namespaces+to+Kafka
> > >.
> > To ensure we allow validation and overrides at the namespace level.
> >
> > That said, KIP-4 will be introducing a config request/response protocol
> >  and adding call to get/alter configs to the admin api. You could
> leverage
> > that to do some of the client validation and defaulting based on your
> > needs. Look for a discussion thread from me on that soon.
> >
> > As far as auto topic creation goes, it sounds like failing fast and
> > allowing the client application to create the topic would provide the
> most
> > flexibility to ensure the topic matches its needed specifications.
> >
> > Thanks,
> > Grant
> >
> > On Wed, Jun 29, 2016 at 3:02 PM, Konstantin Zadorozhny <
> > konstantin.zadoroz...@tubemogul.com> wrote:
> >
> > > Roger,
> > >
> > > I concur with everything you said.
> > >
> > > Couple more use cases to prove the point:
> > >
> > >1. Some topics should always have 1 and only one partition.
> > >2. CDC application based on Kafka Connect. Those type of application
> > >absolutely must know how to create properly configured topics:
> > > compacted, 1
> > >partition, replication factor 3, 2 min in sync replicas. In many
> cases
> > > per
> > >table or per database configuration overrides will be useful too.
> > >
> > > If producer and consumer are able to verify topic configuration on
> > startup
> > > would be really useful. A spec would be great way to document the
> intent
> > of
> > > the code. A lot of silly (but quite hard to pin down) production issues
> > > could have been prevented by having producer to fail fast on
> > misconfigured
> > > topics.
> > >
> > > To add to the auto-creation configuration tally. We do have topic
> > > auto-creation disabled on all our clusters.
> > >
> > > *Konstantin Zadorozhny*
> > > www.tubemogul.com
> > >
> > > On Wed, Jun 29, 2016 at 11:17 AM, Roger Hoover  >
> > > wrote:
> > >
> > > > My comments go a bit beyond just topic creation but I'd like to see
> > Kafka
> > > > make it easier for application developers to specify their
> requirements
> > > > declaratively in a single place.  Today, for example, if your
> > application
> > > 

[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN

2016-07-07 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366584#comment-15366584
 ] 

Thomas Weise commented on KAFKA-1754:
-

KOYA is now part of Slider:

https://github.com/apache/incubator-slider/tree/develop/app-packages/kafka


> KOYA - Kafka on YARN
> 
>
> Key: KAFKA-1754
> URL: https://issues.apache.org/jira/browse/KAFKA-1754
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Thomas Weise
> Attachments: DT-KOYA-Proposal- JIRA.pdf
>
>
> YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, 
> emerging as distributed operating system for big data applications. 
> Initiatives are on the way to bring long running services under the YARN 
> umbrella, leveraging it for centralized resource management and operations 
> ([YARN-896] and examples such as HBase, Accumulo or Memcached through 
> Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application 
> master to launch and manage Kafka clusters running on YARN. Brokers will use 
> resources allocated through YARN with support for recovery, monitoring etc. 
> Please see attached for more details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1597: KAFKA-3817 Follow-up: Avoid forwarding old value i...

2016-07-07 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1597

KAFKA-3817 Follow-up: Avoid forwarding old value if it is null in 
KTableRepartition

Also handle Null value in SmokeTestUtil.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka KHotfix-check-null

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1597.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1597


commit 1b73da4110be9c233e7f20ea306a51b1448cbfa4
Author: Guozhang Wang 
Date:   2016-07-07T18:50:44Z

Do not forward old value if it is null in KTableRepartition; also handle 
Null value in SmokeTestUtil




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3817) KTableRepartitionMap should handle null inputs

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366589#comment-15366589
 ] 

ASF GitHub Bot commented on KAFKA-3817:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/1597

KAFKA-3817 Follow-up: Avoid forwarding old value if it is null in 
KTableRepartition

Also handle Null value in SmokeTestUtil.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka KHotfix-check-null

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1597.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1597


commit 1b73da4110be9c233e7f20ea306a51b1448cbfa4
Author: Guozhang Wang 
Date:   2016-07-07T18:50:44Z

Do not forward old value if it is null in KTableRepartition; also handle 
Null value in SmokeTestUtil




> KTableRepartitionMap should handle null inputs
> --
>
> Key: KAFKA-3817
> URL: https://issues.apache.org/jira/browse/KAFKA-3817
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
> Fix For: 0.10.0.1
>
>
> When calling {{KTable.groupBy}} on the result of a KTable-KTable join, NPEs 
> are raised:
> {{org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
> > KTableMapProcessor.process(KTableRepartitionMap.java:88)}}
> The root cause is that the join is expected to emit null values when no match 
> is found, but KTableRepartitionMap is not set up to handle this case.
> On the users email list, [~guozhang] described a plan of action:
> I think this is actually a bug in KTableRepartitionMap
> that it actually should expect null grouped keys; this would be a
> straight-forward fix for this operator, but I can make a pass over all the
> repartition operators just to make sure they are all gracefully handling
> null keys.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


consumer.subscribe(Pattern p , ..) method fails with Authorizer

2016-07-07 Thread Manikumar Reddy
Hi,

consumer.subscribe(Pattern p , ..) method implementation tries to get
metadata of all the topics.
This will throw TopicAuthorizationException on internal topics and other
unauthorized topics.
We may need to move the pattern matching to sever side.
Is this know issue?.  If not, I will raise JIRA.

logs:
[2016-07-07 22:48:06,317] WARN Error while fetching metadata with
correlation id 1 : {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED}
(org.apache.kafka.clients.NetworkClient)
[2016-07-07 22:48:06,318] ERROR Unknown error when running consumer:
 (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized
to access topics: [__consumer_offsets]


Thanks,
Manikumar


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-07-07 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366591#comment-15366591
 ] 

Bill Bejeck commented on KAFKA-3101:


[~guozhang] ok will do.  

> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Crayford reassigned KAFKA-3933:
---

Assignee: Tom Crayford

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Assignee: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366721#comment-15366721
 ] 

Tom Crayford commented on KAFKA-3933:
-

Done. I hope to work on a fix tomorrow.

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Assignee: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Client Side Auto Topic Creation

2016-07-07 Thread Jun Rao
It seems that it makes sense for the writer to trigger auto topic creation,
but not the reader. So, my preference is Jay's option #1: add a new
configuration to enable topic creation on the producer side and defaults to
true. If the topic doesn't exist, the producer will send a
createTopicRequest and pick up the broker side defaults for #partitions and
replication factor. This matches the current behavior and won't surprise
people. People who want to enforce manual topic creation can disable auto
topic creation on the producer.

On the consumer side, throwing an exception to the client when a topic
doesn't exist probably works for most cases. I am wondering if there is a
case where a user really wants to start the consumer before the topic is
created.

Thanks,

Jun


On Fri, Jul 1, 2016 at 4:09 AM, Ismael Juma  wrote:

> Hi all,
>
> I think there are a few things being discussed and it would be good to make
> that explicit:
>
> 1. If and how we expose auto-topic creation in the client (under the
> assumption that the server auto-topic creation will be deprecated and
> eventually removed)
> 2. The ability to create topics with the cluster defaults for replication
> factor and partition counts
> 3. Support for topic "specs"
> 4. The fact that some exceptions are retriable in some cases, but not
> others
>
> My thoughts on each:
>
> 1. I prefer the approach where we throw an exception and let the clients
> create the topic via `AdminClient` if that's what they need.
> 2. Like Grant, I'm unsure that will generally be used in a positive way.
> However, if this is what we need to be able to deprecate server auto-topic
> creation, the benefits outweigh the costs in my opinion.
> 3. Something like this would be good to have and could potentially provide
> a better solution than 2. However, it needs a separate KIP and may take a
> while for the final design to be agreed. So, it should not prevent progress
> from being made in my opinion.
> 4. This has come up before. Encoding whether an exception is retriable or
> not via inheritance is a bit restrictive. Also, something that should be
> discussed separately, probably.
>
> Ismael
>
> On Wed, Jun 29, 2016 at 10:37 PM, Grant Henke  wrote:
>
> > Hi Roger and Constantine,
> >
> > Thanks for the feedback.
> >
> > I agree that configuration to maintain guarantees is commonly spread
> across
> > enterprise teams, making it difficult to get right. That said its also
> hard
> > to solve for every company structure too. I think there is room for an
> open
> > discussion about what configs should be able to be
> > validated/enforced/overridden and where configurations should live. I
> think
> > thats big enough for a whole new KIP and would like to push that
> discussion
> > out until that KIP is opened. These discussions would also make sense in
> > KIP-37
> > - Add Namespaces to Kafka
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-37+-+Add+Namespaces+to+Kafka
> > >.
> > To ensure we allow validation and overrides at the namespace level.
> >
> > That said, KIP-4 will be introducing a config request/response protocol
> >  and adding call to get/alter configs to the admin api. You could
> leverage
> > that to do some of the client validation and defaulting based on your
> > needs. Look for a discussion thread from me on that soon.
> >
> > As far as auto topic creation goes, it sounds like failing fast and
> > allowing the client application to create the topic would provide the
> most
> > flexibility to ensure the topic matches its needed specifications.
> >
> > Thanks,
> > Grant
> >
> > On Wed, Jun 29, 2016 at 3:02 PM, Konstantin Zadorozhny <
> > konstantin.zadoroz...@tubemogul.com> wrote:
> >
> > > Roger,
> > >
> > > I concur with everything you said.
> > >
> > > Couple more use cases to prove the point:
> > >
> > >1. Some topics should always have 1 and only one partition.
> > >2. CDC application based on Kafka Connect. Those type of application
> > >absolutely must know how to create properly configured topics:
> > > compacted, 1
> > >partition, replication factor 3, 2 min in sync replicas. In many
> cases
> > > per
> > >table or per database configuration overrides will be useful too.
> > >
> > > If producer and consumer are able to verify topic configuration on
> > startup
> > > would be really useful. A spec would be great way to document the
> intent
> > of
> > > the code. A lot of silly (but quite hard to pin down) production issues
> > > could have been prevented by having producer to fail fast on
> > misconfigured
> > > topics.
> > >
> > > To add to the auto-creation configuration tally. We do have topic
> > > auto-creation disabled on all our clusters.
> > >
> > > *Konstantin Zadorozhny*
> > > www.tubemogul.com
> > >
> > > On Wed, Jun 29, 2016 at 11:17 AM, Roger Hoover  >
> > > wrote:
> > >
> > > > My comments go a bit beyond just topic creation but I'd like to see
> > Kafka
> > > > make it easier for application developers to

[jira] [Commented] (KAFKA-3689) Exception when attempting to decrease connection count for address with no connections

2016-07-07 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366744#comment-15366744
 ] 

James Cheng commented on KAFKA-3689:


We are seeing this too. Kafka 0.10.0.0

I attached an excerpt from server.log around the time this started. The file is 
called 'KAFKA-3689.log.redacted'.  You can see some other errors leading up to 
the 'Attempting to decrease connection count' error.

[2016-07-06 20:07:46,003] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.NullPointerException
at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:486)
at 
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)
 

> Exception when attempting to decrease connection count for address with no 
> connections
> --
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Jun Rao
> Fix For: 0.10.1.0, 0.10.0.1
>
> Attachments: KAFKA-3689.log.redacted, kafka-3689-instrumentation.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144
> at 
> kafka.network.ConnectionQuotas$$anonfun$

[jira] [Updated] (KAFKA-3689) Exception when attempting to decrease connection count for address with no connections

2016-07-07 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng updated KAFKA-3689:
---
Attachment: KAFKA-3689.log.redacted

> Exception when attempting to decrease connection count for address with no 
> connections
> --
>
> Key: KAFKA-3689
> URL: https://issues.apache.org/jira/browse/KAFKA-3689
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.9.0.1
> Environment: ubuntu 14.04,
> java version "1.7.0_95"
> OpenJDK Runtime Environment (IcedTea 2.6.4) (7u95-2.6.4-0ubuntu0.14.04.2)
> OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
> 3 broker cluster (all 3 servers identical -  Intel Xeon E5-2670 @2.6GHz, 
> 8cores, 16 threads 64 GB RAM & 1 TB Disk)
> Kafka Cluster is managed by 3 server ZK cluster (these servers are different 
> from Kafka broker servers). All 6 servers are connected via 10G switch. 
> Producers run from external servers.
>Reporter: Buvaneswari Ramanan
>Assignee: Jun Rao
> Fix For: 0.10.1.0, 0.10.0.1
>
> Attachments: KAFKA-3689.log.redacted, kafka-3689-instrumentation.patch
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> As per Ismael Juma's suggestion in email thread to us...@kafka.apache.org 
> with the same subject, I am creating this bug report.
> The following error occurs in one of the brokers in our 3 broker cluster, 
> which serves about 8000 topics. These topics are single partitioned with a 
> replication factor = 3. Each topic gets data at a low rate  – 200 bytes per 
> sec.  Leaders are balanced across the topics.
> Producers run from external servers (4 Ubuntu servers with same config as the 
> brokers), each producing to 2000 topics utilizing kafka-python library.
> This error message occurs repeatedly in one of the servers. Between the hours 
> of 10:30am and 1:30pm on 5/9/16, there were about 10 Million such 
> occurrences. This was right after a cluster restart.
> This is not the first time we got this error in this broker. In those 
> instances, error occurred hours / days after cluster restart.
> =
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144 (actual network address 
> masked)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)
> [2016-05-09 10:38:43,932] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.IllegalArgumentException: Attempted to decrease connection count 
> for address with no connections, address: /X.Y.Z.144
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at 
> kafka.network.ConnectionQuotas$$anonfun$9.apply(SocketServer.scala:565)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at kafka.network.ConnectionQuotas.dec(SocketServer.scala:564)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:450)
> at 
> kafka.network.Processor$$anonfun$run$13.apply(SocketServer.scala:445)
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at kafka.network.Processor.run(SocketServer.scala:445)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-55: Secure quotas for authenticated users

2016-07-07 Thread Tom Crayford
+1 (non-binding)

On Thursday, 7 July 2016, Rajini Sivaram 
wrote:

> I would like to initiate voting for KIP-55 (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users
> ).
> Since the KIP has changed quite a lot since the last vote, we will discard
> the previous vote and start this new voting thread.
>
> KIP-55 extends the existing client-id quota implementation to enable secure
> quotas for multi-user environments. The KIP proposes a flexible, unified
> design that supports quotas at ,  or 
> levels. It retains compatibility with the existing  quotas when
> new user level quotas are not configured.
>
> Thank you...
>
>
> Regards,
>
> Rajini
>


RE: Error after restarting kafka broker

2016-07-07 Thread Subhash Agrawal
Also including kafka user mailing list.
Has anybody seen this error? Any idea to fix it?

Thanks
Subhash Agrawal.

From: Subhash Agrawal
Sent: Wednesday, July 06, 2016 5:12 PM
To: 'dev@kafka.apache.org'
Subject: Error after restarting kafka broker

Hi All,
I am running kafka broker (0.10.0 version) in standalone mode (Single kafka 
broker in the cluster).

After couple of restart, I see this error during embedded kafka startup, even 
though it does not seem to be causing any problem.
Is there any way I can avoid this error?

Thanks
Subhash Agrawal

kafka.common.NoReplicaOnlineException: No replica for partition 
[__consumer_offsets,2] is alive. Live brokers are: [Set()], Assigned replicas 
are: [List(0)]
at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:345)
at 
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:205)
at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
at 
kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:70)
at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:335)
at 
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:166)
at 
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
at 
kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:50)
at 
kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:48)
at 
kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:48)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at 
kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:48)
at 
kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:684)
at 
kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:680)
at 
kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:680)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at 
kafka.controller.KafkaController.startup(KafkaController.scala:680)
at kafka.server.KafkaServer.startup(KafkaServer.scala:200)


SSL: Broker works but consumer/producer fail

2016-07-07 Thread Vineet Kumar
Hi
  I followed Apache Kafka SSL instructions verbatim but my producer and
consumer both hang or error out as follows.
openssl s_client BTW does work fine with the server below yielding
certificates etc thereby confirming that the server can talk back SSL.


*Producer and Consumer*
=

Config changes (client-ssl.properties)
---
security.protocol=SSL

% bin/kafka-console-*consumer*.sh --bootstrap-server 192.168.1.XXX:9093
--topic test --new-consumer --consumer.config config/client-ssl.properties
**

% bin/kafka-console-*producer*.sh --broker-list 192.168.1.XXX:9093 --topic
test --producer.config config/client-ssl.properties
a

**

[2016-07-07 16:35:57,670] ERROR Error when sending message to topic test
with key: null, value: 29 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata
after 6 ms.

*Broker*
==
 Config changes (server.properties)
 ---
listeners=SSL://192.168.1.XXX:9093
security.inter.broker.protocol=SSL
advertised.listeners=SSL://192.168.1.XXX:9093
ssl.keystore.location=/<..>/server.keystore.jks
ssl.keystore.password=
ssl.key.password=

% bin/kafka-*server*-start.sh config/server.properties
[2016-07-07 16:14:00,805] INFO Registered broker 0 at path /brokers/ids/0
with addresses: *SSL -> EndPoint(192.168.1.XXX,9093,SSL)*
(kafka.utils.ZkUtils)
[2016-07-07 16:14:00,820] INFO New leader is 0
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-07-07 16:14:00,825] INFO Kafka version : 0.10.0.0
(org.apache.kafka.common.utils.AppInfoParser)
[2016-07-07 16:14:00,825] INFO Kafka commitId : b8642491e78c5a13
(org.apache.kafka.common.utils.AppInfoParser)
[2016-07-07 16:14:00,827] INFO [Kafka Server 0], started
(kafka.server.KafkaServer)

*Zookeeper*
=
 Config changes
 ---
  Nothing

 % bin/zookeeper-server-start.sh config/zookeeper.properties


[2016-07-07 16:13:18,002] INFO binding to port 0.0.0.0/0.0.0.0:2181
(org.apache.zookeeper.server.NIOServerCnxnFactory)


[2016-07-07 16:14:00,131] INFO Accepted socket connection from /
127.0.0.1:41188 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-07-07 16:14:00,189] INFO Client attempting to establish new session
at /127.0.0.1:41188 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-07 16:14:00,199] INFO Established session 0x155c7a306dc with
negotiated timeout 6000 for client /127.0.0.1:41188
(org.apache.zookeeper.server.ZooKeeperServer)
[2016-07-07 16:14:00,652] INFO Got user-level KeeperException when
processing sessionid:0x155c7a306dc type:delete cxid:0x22 zxid:0xd6
txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election
Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
(org.apache.zookeeper.server.PrepRequestProcessor)
[2016-07-07 16:14:00,778] INFO Got user-level KeeperException when
processing sessionid:0x155c7a306dc type:create cxid:0x29 zxid:0xd7
txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode =
NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-07-07 16:14:00,778] INFO Got user-level KeeperException when
processing sessionid:0x155c7a306dc type:create cxid:0x2a zxid:0xd8
txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode =
NodeExists for /brokers/ids
(org.apache.zookeeper.server.PrepRequestProcessor)


Kafka 0.10 Java Client TimeoutException: Batch containing 1 record(s) expired

2016-07-07 Thread Armin Kadiyan
Hi all,

I have a single node, multi (3) broker Zookeeper / Kafka setup. I am using the 
Kafka 0.10 Java client.

I wrote following simple remote (on a different Server than Kafka) Producer (in 
the code I replaced my public IP address with MYIP):

Properties config = new Properties();
try {
config.put(ProducerConfig.CLIENT_ID_CONFIG, 
InetAddress.getLocalHost().getHostName());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, 
MYIP:9094");
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer(config);
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(GATEWAY_SCHEMA);
recordInjection = GenericAvroCodecs.toBinary(schema);
GenericData.Record avroRecord = new GenericData.Record(schema);
//Fill in avroRecord
byte[] bytes = recordInjection.apply(avroRecord);

Future future = producer.send(new ProducerRecord(datasetId+"", "testKey", bytes));
RecordMetadata data = future.get();
} catch (Exception e) {
e.printStackTrace();
}

My server properties for the 3 brokers look like this (in the 3 different 
server properties files broker.id is 0, 1, 2 and listeners is 
PLAINTEXT://:9093, PLAINTEXT://:9093, PLAINTEXT://:9094 and host.name is 
10.2.0.4, 10.2.0.5, 10.2.0.6).
This is the first server properties file:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka1-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=30
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
host.name=10.2.0.4

When I execute the code, I get following exception:

java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) 
expired due to timeout while requesting metadata from brokers for 100101-0
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at 
com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
at com.nr.roles.gateway.gw.service(gw.java:126)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
at org.eclipse.jetty.server.Server.handle(Server.java:517)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
at 
org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
at 
org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 
record(s) expired due to timeout while requesting metadata from brokers for 
100101-0

Does anyone know what I am missing? Any help would be appreciated. Thanks a lot

Best,
Armin Kadiyan