[jira] [Commented] (KAFKA-4497) log cleaner breaks on timeindex

2016-12-14 Thread Robert Schumann (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747829#comment-15747829
 ] 

Robert Schumann commented on KAFKA-4497:


[~becket_qin] [~junrao]: Many thanks for fixing it! Due to time pressure in our 
project we decided to downgrade to 0.10.0.1, so I'm currently not really able 
to test this. However, I think we will give 0.10.1.1 a try soon in some 
integration system and report back.

> log cleaner breaks on timeindex
> ---
>
> Key: KAFKA-4497
> URL: https://issues.apache.org/jira/browse/KAFKA-4497
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.1.0
> Environment: Debian Jessie, Oracle Java 8u92, kafka_2.11-0.10.1.0
>Reporter: Robert Schumann
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 0.10.1.1
>
>
> _created from ML entry by request of [~ijuma]_
> Hi all,
> we are facing an issue with latest kafka 0.10.1 and the log cleaner thread 
> with regards to the timeindex files. From the log of the log-cleaner we see 
> after startup that it tries to cleanup a topic called xdc_listing-status-v2 
> [1]. The topic is setup with log compaction [2] and the kafka cluster 
> configuration has log.cleaner enabled [3]. Looking at the log and the newly 
> created file [4], the cleaner seems to refer to tombstones prior to 
> epoch_time=0 - maybe because he finds messages, which don’t have a timestamp 
> at all (?). All producers and consumers are using 0.10.1 and the topics have 
> been created completely new, so I’m not sure, where this issue would come 
> from. The original timeindex file [5] seems to show only valid timestamps for 
> the mentioned offsets. I would also like to mention that the issue happened 
> in two independent datacenters at the same time, so I would rather expect an 
> application/producer issue instead of random disk failures. We didn’t have 
> the problem with 0.10.0 for around half a year, it appeared shortly after the 
> upgrade to 0.10.1.
> The puzzling message from the cleaner “cleaning prior to Fri Dec 02 16:35:50 
> CET 2016, discarding tombstones prior to Thu Jan 01 01:00:00 CET 1970” also 
> confuses me a bit. Does that mean, it does not find any log segments which 
> can be cleaned up or the last timestamp of the last log segment is somehow 
> broken/missing?
> I’m also a bit wondering, why the log cleaner thread stops completely after 
> an error with one topic. I would at least expect that it keeps on cleaning up 
> other topics, but apparently it doesn’t do that, e.g. it’s not even cleaning 
> the __consumer_offsets anymore.
> Does anybody have the same issues or can explain, what’s going on? Thanks for 
> any help or suggestions.
> Cheers
> Robert
> [1]
> {noformat}
> [2016-12-06 12:49:17,885] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,895] INFO [kafka-log-cleaner-thread-0], Starting  
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,947] INFO Cleaner 0: Beginning cleaning of log 
> xdc_listing-status-v2-1. (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,948] INFO Cleaner 0: Building offset map for 
> xdc_listing-status-v2-1... (kafka.log.LogCleaner)
> [2016-12-06 12:49:17,989] INFO Cleaner 0: Building offset map for log 
> xdc_listing-status-v2-1 for 1 segments in offset range [0, 194991). 
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,572] INFO Cleaner 0: Offset map for log 
> xdc_listing-status-v2-1 complete. (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,577] INFO Cleaner 0: Cleaning log 
> xdc_listing-status-v2-1 (cleaning prior to Fri Dec 02 16:35:50 CET 2016, 
> discarding tombstones prior to Thu Jan 01 01:00:00 CET 1970)... 
> (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,580] INFO Cleaner 0: Cleaning segment 0 in log 
> xdc_listing-status-v2-1 (largest timestamp Fri Dec 02 16:35:50 CET 2016) into 
> 0, retaining deletes. (kafka.log.LogCleaner)
> [2016-12-06 12:49:24,968] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> kafka.common.InvalidOffsetException: Attempt to append an offset (-1) to slot 
> 9 no larger than the last offset appended (11832) to 
> /var/lib/kafka/xdc_listing-status-v2-1/.timeindex.cleaned.
> at 
> kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply$mcV$sp(TimeIndex.scala:117)
> at 
> kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.scala:107)
> at 
> kafka.log.TimeIndex$$anonfun$maybeAppend$1.apply(TimeIndex.scala:107)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:107)
> at kafka.log.LogSegment.append(LogSegment.scala:106)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:518)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegm

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-14 Thread Rajini Sivaram
Hi Apurva,

Thank you for the answers. Just one follow-on.

15. Let me rephrase my original question. If all control messages (messages
to transaction logs and markers on user logs) were acknowledged only after
flushing the log segment, will transactions become durable in the
traditional sense (i.e. not restricted to min.insync.replicas failures) ?
This is not a suggestion to update the KIP. It seems to me that the design
enables full durability if required in the future with a rather
non-intrusive change. I just wanted to make sure I haven't missed anything
fundamental that prevents Kafka from doing this.



On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin  wrote:

> Hi Apurva,
>
> Thanks for the detailed answers... and sorry for the late reply!
>
> It does sound like, if the input-partitions-to-app-id mapping never
> changes, the existing fencing mechanisms should prevent duplicates. Great!
> I'm a bit concerned the proposed API will be delicate to program against
> successfully -- even in the simple case, we need to create a new producer
> instance per input partition, and anything fancier is going to need its own
> implementation of the Streams/Samza-style 'task' idea -- but that may be
> fine for this sort of advanced feature.
>
> For the second question, I notice that Jason also elaborated on this
> downthread:
>
> > We also looked at removing the producer ID.
> > This was discussed somewhere above, but basically the idea is to store
> the
> > AppID in the message set header directly and avoid the mapping to
> producer
> > ID altogether. As long as batching isn't too bad, the impact on total
> size
> > may not be too bad, but we were ultimately more comfortable with a fixed
> > size ID.
>
> ...which suggests that the distinction is useful for performance, but not
> necessary for correctness, which makes good sense to me. (Would a 128-bid
> ID be a reasonable compromise? That's enough room for a UUID, or a
> reasonable hash of an arbitrary string, and has only a marginal increase on
> the message size.)
>
> On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta  wrote:
>
> > Hi Ben,
> >
> > Now, on to your first question of how deal with consumer rebalances. The
> > short answer is that the application needs to ensure that the the
> > assignment of input partitions to appId is consistent across rebalances.
> >
> > For Kafka streams, they already ensure that the mapping of input
> partitions
> > to task Id is invariant across rebalances by implementing a custom sticky
> > assignor. Other non-streams apps can trivially have one producer per
> input
> > partition and have the appId be the same as the partition number to
> achieve
> > the same effect.
> >
> > With this precondition in place, we can maintain transactions across
> > rebalances.
> >
> > Hope this answers your question.
> >
> > Thanks,
> > Apurva
> >
> > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin  wrote:
> >
> > > Thanks for this! I'm looking forward to going through the full proposal
> > in
> > > detail soon; a few early questions:
> > >
> > > First: what happens when a consumer rebalances in the middle of a
> > > transaction? The full documentation suggests that such a transaction
> > ought
> > > to be rejected:
> > >
> > > > [...] if a rebalance has happened and this consumer
> > > > instance becomes a zombie, even if this offset message is appended in
> > the
> > > > offset topic, the transaction will be rejected later on when it tries
> > to
> > > > commit the transaction via the EndTxnRequest.
> > >
> > > ...but it's unclear to me how we ensure that a transaction can't
> complete
> > > if a rebalance has happened. (It's quite possible I'm missing something
> > > obvious!)
> > >
> > > As a concrete example: suppose a process with PID 1 adds offsets for
> some
> > > partition to a transaction; a consumer rebalance happens that assigns
> the
> > > partition to a process with PID 2, which adds some offsets to its
> current
> > > transaction; both processes try and commit. Allowing both commits would
> > > cause the messages to be processed twice -- how is that avoided?
> > >
> > > Second: App IDs normally map to a single PID. It seems like one could
> do
> > > away with the PID concept entirely, and just use App IDs in most places
> > > that require a PID. This feels like it would be significantly simpler,
> > > though it does increase the message size. Are there other reasons why
> the
> > > App ID / PID split is necessary?
> > >
> > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have just created KIP-98 to enhance Kafka with exactly once
> delivery
> > > > semantics:
> > > >
> > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > >  > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> > > >
> > > > This KIP adds a transactional messaging mechanism along wi

Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-12-14 Thread Manikumar
That's a good idea. Authenticating every request with delegation token will
be useful for
impersonation use-cases. But as of now, we are thinking delegation token as
just another way
to authenticate the users. We haven't think through all the use cases
related to
impersonation or using delegation token for impersonation. We want to
handle impersonation
(KAFKA-3712) as part of separate KIP.

Will that be Ok?


On Wed, Dec 14, 2016 at 8:09 AM, Gwen Shapira  wrote:

> Thinking out loud here:
>
> It looks like authentication with a delegation token is going to be
> super-cheap, right? We just compare the token to a value in the broker
> cache?
>
> If I understood the KIP correctly, right now it suggests that
> authentication happens when establishing the client-broker connection (as
> normal for Kafka. But perhaps we want to consider authenticating every
> request with delegation token (if exists)?
>
> So a centralized app can create few producers, do the metadata request and
> broker discovery with its own user auth, but then use delegation tokens to
> allow performing produce/fetch requests as different users? Instead of
> having to re-connect for each impersonated user?
>
> This may over-complicate things quite a bit (basically adding extra
> information in every request), but maybe it will be useful for
> impersonation use-cases (which seem to drive much of the interest in this
> KIP)?
> Kafka Connect, NiFi and friends can probably use this to share clients
> between multiple jobs, tasks, etc.
>
> What do you think?
>
> Gwen
>
> On Tue, Dec 13, 2016 at 12:43 AM, Manikumar 
> wrote:
>
> > Ashish,
> >
> > Thank you for reviewing the KIP.  Please see the replies inline.
> >
> >
> > > 1. How to disable delegation token authentication?
> > >
> > > This can be achieved in various ways, however I think reusing
> delegation
> > > token secret config for this makes sense here. Avoids creating yet
> > another
> > > config and forces delegation token users to consciously set the secret.
> > If
> > > the secret is not set or set to empty string, brokers should turn off
> > > delegation token support. This will however require a new error code to
> > > indicate delegation token support is turned off on broker.
> > >
> >
> >   Thanks for the suggestion. Option to turnoff delegation token
> > authentication will be useful.
> >   I'll update the KIP.
> >
> >
> > >
> > > 2. ACLs on delegation token?
> > >
> > > Do we need to have ACLs defined for tokens? I do not think it buys us
> > > anything, as delegation token can be treated as impersonation of the
> > owner.
> > > Any thing the owner has permission to do, delegation tokens should be
> > > allowed to do as well. If so, we probably won't need to return
> > > authorization exception error code while creating delegation token. It
> > > however would make sense to check renew and expire requests are coming
> > from
> > > owner or renewers of the token, but that does not require explicit
> acls.
> > >
> >
> >
> > Yes, We agreed to not have new acl on who can request delegation token.
> >  I'll update the KIP.
> >
> >
> > >
> > > 3. How to restrict max life time of a token?
> > >
> > > Admins might want to restrict max life time of tokens created on a
> > cluster,
> > > and this can very from cluster to cluster based on use-cases. This
> might
> > > warrant a separate broker config.
> > >
> > >
> > Currently we  have "delegation.token.max.lifetime.sec" server config
> > property
> > May be we can take min(User supplied MaxTime, Server MaxTime) as max life
> > time.
> > I am open to add new config property.
> >
> > Few more comments based on recent KIP update.
> > >
> > > 1. Do we need a separate {{InvalidateTokenRequest}}? Can't we use
> > > {{ExpireTokenRequest}} with with expiryDate set to anything before
> > current
> > > date?
> > >
> >
> > makes sense. we don't need special request to cancel the token. We can
> use
> > ExpireTokenRequest.
> > I'll update the KIP.
> >
> >
> > > 2. Can we change time field names to indicate their unit is
> milliseconds,
> > > like, IssueDateMs, ExpiryDateMs, etc.?
> > >
> > >
> >   Done.
> >
> >
> > > 3. Can we allow users to renew a token for a specified amount of time?
> In
> > > current version of KIP, renew request does not take time as a param,
> not
> > > sure what is expiry time set to after renewal.
> > >
> > >
> >  Yes, we need to specify renew period.  I'll update the KIP.
> >
> >
> > Thanks,
> > Mankumar
> >
> >
> >
> > >
> > > On Mon, Dec 12, 2016 at 9:08 AM Manikumar 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > I would like to reinitiate the discussion on Delegation token support
> > for
> > > >
> > > > Kafka.
> > > >
> > > >
> > > >
> > > > Brief summary of the past discussion:
> > > >
> > > >
> > > >
> > > > 1) Broker stores delegation tokens in zookeeper.  All brokers will
> > have a
> > > >
> > > > cache backed by
> > > >
> > > >zookeeper so they will all get notified whenever a new token is
> > > 

[jira] [Created] (KAFKA-4535) http://kafka.apache.org/quickstart Step 8 missing argument

2016-12-14 Thread Xin (JIRA)
Xin created KAFKA-4535:
--

 Summary: http://kafka.apache.org/quickstart Step 8  missing  
argument
 Key: KAFKA-4535
 URL: https://issues.apache.org/jira/browse/KAFKA-4535
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 0.10.1.0
Reporter: Xin
Priority: Trivial


Step 8: Use Kafka Streams to process data:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer


===> result:
Missing required argument "[zookeeper]"

the command  need "--new-consumer“





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4535) http://kafka.apache.org/quickstart Step 8 missing argument

2016-12-14 Thread Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin updated KAFKA-4535:
---
   Labels: documentation  (was: )
Fix Version/s: 0.10.1.1
   Status: Patch Available  (was: Open)

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-Counts-changelog \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property 
value.deserializer=org.apache.kafka.common.serialization.LongDeserializer  \
--new-consumer

> http://kafka.apache.org/quickstart Step 8  missing  argument
> 
>
> Key: KAFKA-4535
> URL: https://issues.apache.org/jira/browse/KAFKA-4535
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.1.0
>Reporter: Xin
>Priority: Trivial
>  Labels: documentation
> Fix For: 0.10.1.1
>
>
> Step 8: Use Kafka Streams to process data:
> > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property print.value=true \
> --property 
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property 
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
> ===> result:
> Missing required argument "[zookeeper]"
> the command  need "--new-consumer“



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4473) RecordCollector should handle retriable exceptions more strictly

2016-12-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4473:
--
Status: Patch Available  (was: In Progress)

> RecordCollector should handle retriable exceptions more strictly
> 
>
> Key: KAFKA-4473
> URL: https://issues.apache.org/jira/browse/KAFKA-4473
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Thomas Schulz
>Assignee: Damian Guy
>Priority: Critical
>  Labels: architecture
>
> see: https://groups.google.com/forum/#!topic/confluent-platform/DT5bk1oCVk8
> There is probably a bug in the RecordCollector as described in my detailed 
> Cluster test published in the aforementioned post.
> The class RecordCollector has the following behavior:
> - if there is no exception, add the message offset to a map
> - otherwise, do not add the message offset and instead log the above statement
> Is it possible that this offset map contains the latest offset to commit? If 
> so, a message that fails might be overriden be a successful (later) message 
> and the consumer commits every message up to the latest offset?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2253: KAFKA-4534: StreamPartitionAssignor only ever upda...

2016-12-14 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2253

KAFKA-4534: StreamPartitionAssignor only ever updates the 
partitionsByHostState and metadataWithInternalTopics on first assignment.

`partitionsByHostState` and `metadataWithInternalTopics` need to be updated 
on each call to `onAssignment()` otherwise they contain invalid/stale metadata. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4534

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2253.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2253


commit 45833be3ca1b6f8ac86f516cae4ff1b6571089e8
Author: Damian Guy 
Date:   2016-12-13T18:06:38Z

state store name to topic mapping incorrect

commit 29dcc18324b1a888641d6f51635a925bfbd4b074
Author: Damian Guy 
Date:   2016-12-13T20:21:09Z

address comments

commit 4f1c58aedbbcaad91ef5f8ff504eef74157d54e5
Author: Damian Guy 
Date:   2016-12-14T10:37:48Z

update partitionHostState and metadata onAssignment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4534) StreamPartitionAssignor only ever updates the partitionsByHostState and metadataWithInternalTopics once.

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747970#comment-15747970
 ] 

ASF GitHub Bot commented on KAFKA-4534:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2253

KAFKA-4534: StreamPartitionAssignor only ever updates the 
partitionsByHostState and metadataWithInternalTopics on first assignment.

`partitionsByHostState` and `metadataWithInternalTopics` need to be updated 
on each call to `onAssignment()` otherwise they contain invalid/stale metadata. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4534

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2253.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2253


commit 45833be3ca1b6f8ac86f516cae4ff1b6571089e8
Author: Damian Guy 
Date:   2016-12-13T18:06:38Z

state store name to topic mapping incorrect

commit 29dcc18324b1a888641d6f51635a925bfbd4b074
Author: Damian Guy 
Date:   2016-12-13T20:21:09Z

address comments

commit 4f1c58aedbbcaad91ef5f8ff504eef74157d54e5
Author: Damian Guy 
Date:   2016-12-14T10:37:48Z

update partitionHostState and metadata onAssignment




> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once.
> 
>
> Key: KAFKA-4534
> URL: https://issues.apache.org/jira/browse/KAFKA-4534
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once. This results in incorrect metadata on 
> rebalances.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4534) StreamPartitionAssignor only ever updates the partitionsByHostState and metadataWithInternalTopics once.

2016-12-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4534:
--
Status: Patch Available  (was: Open)

> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once.
> 
>
> Key: KAFKA-4534
> URL: https://issues.apache.org/jira/browse/KAFKA-4534
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once. This results in incorrect metadata on 
> rebalances.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4536) Kafka clients throw NullPointerException on poll when delete the relative topic

2016-12-14 Thread mayi_hetu (JIRA)
mayi_hetu created KAFKA-4536:


 Summary: Kafka clients throw NullPointerException on poll when 
delete the relative topic
 Key: KAFKA-4536
 URL: https://issues.apache.org/jira/browse/KAFKA-4536
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.1
Reporter: mayi_hetu


1. new KafkaConsumer
val groupIdString = "test1"  
val props = new Properties();
props.put("bootstrap.servers", "99.12.143.240:9093");
props.put("group.id", groupIdString);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms", "5000");
props.put("metadata.max.age.ms","30");
props.put("session.timeout.ms", "3");
props.setProperty("key.deserializer", 
classOf[ByteArrayDeserializer].getName)
props.setProperty("value.deserializer", 
classOf[ByteArrayDeserializer].getName)
  props.setProperty("client.id", groupIdString)
  new KafkaConsumer[Array[Byte], Array[Byte]](props)
2. *subscribe topic through Partten*
consumer.subscribe(Pattern.compile(".*\\.sh$"), consumerRebalanceListener)
3. use poll(1000) fetching messages
4. delete topic test1.sh in Kafka broker
then the consumer throw NullPointerException
{color:red}
Exception in thread "main" java.lang.NullPointerException
at java.util.ArrayList.addAll(Unknown Source)
at 
org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
at org.apache.kafka.clients.Metadata.update(Metadata.java:185)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:606)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:583)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at 
TestNewConsumer$MyMirrorMakerNewConsumer.receive(TestNewConsumer.scala:146)
at TestNewConsumer$.main(TestNewConsumer.scala:38)
at TestNewConsumer.main(TestNewConsumer.scala)
{color}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4537) StreamPartitionAssignor incorrectly adds standby partitions to the partitionsByHostState map

2016-12-14 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4537:
-

 Summary: StreamPartitionAssignor incorrectly adds standby 
partitions to the partitionsByHostState map
 Key: KAFKA-4537
 URL: https://issues.apache.org/jira/browse/KAFKA-4537
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.10.2.0


If a KafkaStreams app is using Standby Tasks then StreamPartitionAssignor will 
add the standby partitions to the partitionsByHostState map for each host. This 
is incorrect as the partitionHostState map is used to resolve which host is 
hosting a particular store for a key. 
The result is that doing metadata lookups for interactive queries can return an 
incorrect host



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4536) Kafka clients throw NullPointerException on poll when delete the relative topic

2016-12-14 Thread mayi_hetu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748056#comment-15748056
 ] 

mayi_hetu commented on KAFKA-4536:
--

As I followed the code, I found  there is a MetadataListener in the  
{color:red}ConsumerCoordinator{color}.
 In the {color:red}onMetadataUpdate{color} method . it will 
*changeSubscription* and *setTopics* to the metadata.

*subscriptions.changeSubscription(topicsToSubscribe);*
*metadata.setTopics(subscriptions.groupSubscription());*

In the changeSubscription method it will only addAll topicsToSubscribe to 
groupSubscription.

*this.subscription.clear();*
*this.subscription.addAll(topicsToSubscribe);*
*this.groupSubscription.addAll(topicsToSubscribe);*

So  at 
org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
  *partitionInfos.addAll(cluster.partitionsForTopic(topic));*
{color:red}the topics still have the delete topic but the cluste cannot get 
partitionsForTopic for it {color}, and the cluster.partitionsForTopic(topic) 
method will return null.

> Kafka clients throw NullPointerException on poll when delete the relative 
> topic
> ---
>
> Key: KAFKA-4536
> URL: https://issues.apache.org/jira/browse/KAFKA-4536
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: mayi_hetu
>
> 1. new KafkaConsumer
>   val groupIdString = "test1"  
>   val props = new Properties();
>   props.put("bootstrap.servers", "99.12.143.240:9093");
>   props.put("group.id", groupIdString);
>   props.put("enable.auto.commit", "false");
>   props.put("auto.offset.reset","earliest");
>   props.put("auto.commit.interval.ms", "5000");
>   props.put("metadata.max.age.ms","30");
>   props.put("session.timeout.ms", "3");
> props.setProperty("key.deserializer", 
> classOf[ByteArrayDeserializer].getName)
> props.setProperty("value.deserializer", 
> classOf[ByteArrayDeserializer].getName)
>   props.setProperty("client.id", groupIdString)
>   new KafkaConsumer[Array[Byte], Array[Byte]](props)
> 2. *subscribe topic through Partten*
> consumer.subscribe(Pattern.compile(".*\\.sh$"), consumerRebalanceListener)
> 3. use poll(1000) fetching messages
> 4. delete topic test1.sh in Kafka broker
> then the consumer throw NullPointerException
> {color:red}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.ArrayList.addAll(Unknown Source)
>   at 
> org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
>   at org.apache.kafka.clients.Metadata.update(Metadata.java:185)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:606)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:583)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>   at 
> TestNewConsumer$MyMirrorMakerNewConsumer.receive(TestNewConsumer.scala:146)
>   at TestNewConsumer$.main(TestNewConsumer.scala:38)
>   at TestNewConsumer.main(TestNewConsumer.scala)
> {color}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-101: Alter Replication Protocol to use Leader Generation rather than High Watermark for Truncation

2016-12-14 Thread Ben Stopford
Thanks Neha. It’s a fair point. We don’t need two anymore. 

There isn’t much to be gained from changing the existing nomenclature, unless 
people feel strongly about it the word epoch. LeaderEpoch is currently fairly 
ingrained (spans Zookeeper, the controller, the metadata requests etc). I 
suggest ditch LeaderGeneration.

I’ll update the KIP.

B

Ben Stopford
Confluent, http://www.confluent.io 



> On 11 Dec 2016, at 22:30, Neha Narkhede  wrote:
> 
> Good to see this KIP being proposed. Back when I added the epoch to the
> replication protocol, we discussed adding it to the log due to the failure
> scenarios listed in the KIP but I failed to convince people that it was
> worth the effort needed to upgrade the cluster (especially after we asked
> people to go through a painful backwards incompatible upgrade for 0.8 :-))
> The lack of including the leader epoch/generation in the log has also been
> one of the biggest critiques of Kafka's replication protocol by the
> distributed systems community.
> 
> I'm in favor of this work though I think we shouldn't end up with 2 notions
> of representing a leader's generation. When we added the epoch, we wanted
> to add it to the log but we didn't. Now that we are adding the generation
> id to the log, I think we should revisit calling it the epoch at all. Have
> you thought about a way to evolve the epoch to the generation id throughout
> and what it will take?
> 
> On Sun, Dec 11, 2016 at 4:31 AM Ben Stopford  wrote:
> 
>> Hi All
>> 
>> Please find the below KIP which describes a proposed solution to a couple
>> of issues that have been observed with the replication protocol.
>> 
>> In short, the proposal replaces the use of the High Watermark, for
>> follower log trunctation, with an alternate Generation Marker. This
>> uniquely defines which leader messages were acknowledged by.
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Generation+rather+than+High+Watermark+for+Truncation
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Generation+rather+than+High+Watermark+for+Truncation
>>> 
>> 
>> All comments and suggestions greatly appreciated.
>> 
>> Ben Stopford
>> Confluent, http://www.confluent.io 
>> 
>> --
> Thanks,
> Neha



kafka_2.11-0.9.0.1 crash with java coredump

2016-12-14 Thread Mazhar Shaikh
Hi All,

I am using kafka_2.11-0.9.0.1 with java version "1.7.0_51".

On random days kafka process stops (crashes) with a java coredump file as
below.

(gdb) bt
#0 0x7f33059f70d5 in raise () from /lib/x86_64-linux-gnu/libc.so.6
#1 0x7f33059fa83b in abort () from /lib/x86_64-linux-gnu/libc.so.6
#2 0x7f33049ae405 in os::abort(bool) () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#3 0x7f3304b2d347 in VMError::report_and_die() () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#4 0x7f3304b2d8de in crash_handler(int, siginfo*, void*) () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#5 
#6 0x7f33046b92f5 in
G1BlockOffsetArray::forward_to_block_containing_addr_slow(HeapWord*,
HeapWord*, void const*) () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#7 0x7f33049a60f0 in os::print_location(outputStream*, long, bool) ()
from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#8 0x7f33049b2678 in os::print_register_info(outputStream*, void*) ()
from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#9 0x7f3304b2b94b in VMError::report(outputStream*) () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#10 0x7f3304b2cf4a in VMError::report_and_die() () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#11 0x7f33049b2d8f in JVM_handle_linux_signal () from
/opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
#12 
#13 0x7f32ffbc64bf in ?? ()
#14 0xca57b708 in ?? ()
#15 0x7f32fae97928 in ?? ()
#16 0xbf2f05e8 in ?? ()
#17 0x in ?? ()
#18 0xc3b27610 in ?? ()
#19 0xbed92898 in ?? ()
#20 0xe269aac8 in ?? ()
#21 0x in ?? ()


Can anyone suggest a solution to overcome this issue.

Thank you.

Mazhar Shaikh.


[jira] [Created] (KAFKA-4538) Version

2016-12-14 Thread Samuel Durand (JIRA)
Samuel Durand created KAFKA-4538:


 Summary: Version 
 Key: KAFKA-4538
 URL: https://issues.apache.org/jira/browse/KAFKA-4538
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.0
 Environment: Unix OS, Scala
Reporter: Samuel Durand
Priority: Minor


The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from closing 
by itself. This was not the case with the previous 0.10.0.1
To close the app you now have to add an explicit system exit.

{code:title=Bar.scala|borderStyle=solid}
object App extends App {

 doSomethingThatCreatesAndUsesAKafkaClient()

  // Necessary to close the application
  System.exit(0)
}
{code}

I didn't find out yet if that's because of some leaking process or something 
else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-12-14 Thread Edoardo Comar
Thanks Rajini,
Before Kafka-4137, we could avoid coordinator starvation without making a 
special case for a special connection,
but rather simply, in applying the buffer.memory check only to 'large' 
responses 
(e.g.  size > 1k, possibly introducing a new config entry) in 

NetworkReceive.readFromReadableChannel(ReadableByteChannel)

Essentially this would limit reading fetch responses but allow for other 
responses to be processed.

This is a sample of sizes for responses I collected :

* size=108 APIKEY=3 METADATA
*  size=28 APIKEY=10 GROUP_COORDINATOR
*  size=193 APIKEY=11 JOIN_GROUP
*  size=39 APIKEY=14 SYNC_GROUP
*  size=39 APIKEY=9 OFFSET_FETCH
*  size=45 APIKEY=2 LIST_OFFSETS
*  size=88926 APIKEY=1 FETCH
*  size=45 APIKEY=1 FETCH
*  size=6 APIKEY=12 HEARTBEAT
*  size=45 APIKEY=1 FETCH
*  size=45 APIKEY=1 FETCH
*  size=45 APIKEY=1 FETCH
*  size=6 APIKEY=12 HEARTBEAT
*  size=45 APIKEY=1 FETCH
*  size=45 APIKEY=1 FETCH

What do you think?
--
Edoardo Comar
IBM MessageHub
eco...@uk.ibm.com
IBM UK Ltd, Hursley Park, SO21 2JN

IBM United Kingdom Limited Registered in England and Wales with number 
741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 
3AU



From:   Rajini Sivaram 
To: dev@kafka.apache.org
Date:   13/12/2016 17:27
Subject:Re: [DISCUSS] KIP-81: Max in-flight fetches



Coordinator starvation: For an implementation based on KIP-72, there will
be coordinator starvation without KAFKA-4137 since you would stop reading
from sockets when the memory pool is full (the fact that coordinator
messages are small doesn't help). I imagine you can work around this by
treating coordinator connections as special connections but that spills
over to common network code. Separate NetworkClient for coordinator
proposed in KAFKA-4137 would be much better.

On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison 
wrote:

> Thanks for all the feedback.
>
> I've updated the KIP with all the details.
> Below are a few of the main points:
>
> - Overall memory usage of the consumer:
> I made it clear the memory pool is only used to store the raw bytes
> from the network and that the decompressed/deserialized messages are
> not stored in it but as extra memory on the heap. In addition, the
> consumer also keeps track of other things (in flight requests,
> subscriptions, etc..) that account for extra memory as well. So this
> is not a hard bound memory constraint but should still allow to
> roughly size how much memory can be used.
>
> - Relation with the existing settings:
> There are already 2 settings that deal with memory usage of the
> consumer. I suggest we lower the priority of
> `max.partition.fetch.bytes` (I wonder if we should attempt to
> deprecate it or increase its default value so it's a contraint less
> likely to be hit) and have the new setting `buffer.memory` as High.
> I'm a bit unsure what's the best default value for `buffer.memory`, I
> suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd appreciate
> feedback. It should always at least be equal to `max.fetch.bytes`.
>
> - Configuration name `buffer.memory`:
> I think it's the name that makes the most sense. It's aligned with the
> producer and as mentioned generic enough to allow future changes if
> needed.
>
> - Coordination starvation:
> Yes this is a potential issue. I'd expect these requests to be small
> enough to not be affected too much. If that's the case KAFKA-4137
> suggests a possible fix.
>
>
>
> On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma  wrote:
> > Makes sense Jay.
> >
> > Mickael, in addition to how we can compute defaults of the other 
settings
> > from `buffer.memory`, it would be good to specify what is allowed and 
how
> > we handle the different cases (e.g. what do we do if
> > `max.partition.fetch.bytes`
> > is greater than `buffer.memory`, is that simply not allowed?).
> >
> > To summarise the gap between the ideal scenario (user specifies how 
much
> > memory the consumer can use) and what is being proposed:
> >
> > 1. We will decompress and deserialize the data for one or more 
partitions
> > in order to return them to the user and we don't account for the
> increased
> > memory usage resulting from that. This is likely to be significant on 
a
> per
> > record basis, but we try to do it for the minimal number of records
> > possible within the constraints of the system. Currently the 
constraints
> > are: we decompress and deserialize the data for a partition at a time
> > (default `max.partition.fetch.bytes` is 1MB, but this is a soft limit 
in
> > case there are oversized messages) until we have enough records to
> > satisfy `max.poll.records`
> > (default 500) or there are no more completed fetches. It seems like 
this
> > may be OK for a lot of cases, but some tuning will still be required 
in
> > others.
> >
> > 2. We don't account for bookkeeping data structures or intermediate

Re: [DISCUSS] KIP-101: Alter Replication Protocol to use Leader Generation rather than High Watermark for Truncation

2016-12-14 Thread Ben Stopford
Thanks Onur. That’s correct, we no longer nee that extra mapping. I’ll update 
the KIP. 
B

Ben Stopford
Confluent, http://www.confluent.io 



> On 11 Dec 2016, at 23:54, Onur Karaman  wrote:
> 
> Pretty happy to see a KIP tackling this problem! One comment below.
> 
> The "Extending LeaderEpoch to include Returning Leaders" states:
> "To protect against this eventuality the controller will maintain a cached
> mapping of [broker -> Zookeeper CZXID] (CZXID is a unique and monotonic
> 64-bit number) for the broker’s registration in Zookeeper
> (/brokers/ids/[brokerId]). If the controller receives a Broker Registration
> where the CZXID has changed it will increment the Leader Epoch and
> propagate that value to the broker via the Leader and ISR Request (in the
> normal way), then update the cached CZXID for that broker."
> 
> In general I think kafka underutilizes zookeeper's various flavors of zxids
> but this time it's not clear to me what the motivation is for maintaining
> the broker to czxid mapping. It seems that the following check is
> redundant: "If the controller receives a Broker Registration where the
> CZXID has changed". By definition, the czxid of the /brokers/ids/[brokerId]
> znode will always change upon successful broker registration (
> https://zookeeper.apache.org/doc/r3.4.8/zookeeperProgrammers.html#sc_zkStatStructure).
> Why maintain the mapping at all? Why not just always update leader epochs
> and propagate every time the controller receives the broker registration zk
> event?
> 
> On Sun, Dec 11, 2016 at 2:30 PM, Neha Narkhede  wrote:
> 
>> Good to see this KIP being proposed. Back when I added the epoch to the
>> replication protocol, we discussed adding it to the log due to the failure
>> scenarios listed in the KIP but I failed to convince people that it was
>> worth the effort needed to upgrade the cluster (especially after we asked
>> people to go through a painful backwards incompatible upgrade for 0.8 :-))
>> The lack of including the leader epoch/generation in the log has also been
>> one of the biggest critiques of Kafka's replication protocol by the
>> distributed systems community.
>> 
>> I'm in favor of this work though I think we shouldn't end up with 2 notions
>> of representing a leader's generation. When we added the epoch, we wanted
>> to add it to the log but we didn't. Now that we are adding the generation
>> id to the log, I think we should revisit calling it the epoch at all. Have
>> you thought about a way to evolve the epoch to the generation id throughout
>> and what it will take?
>> 
>> On Sun, Dec 11, 2016 at 4:31 AM Ben Stopford  wrote:
>> 
>>> Hi All
>>> 
>>> Please find the below KIP which describes a proposed solution to a couple
>>> of issues that have been observed with the replication protocol.
>>> 
>>> In short, the proposal replaces the use of the High Watermark, for
>>> follower log trunctation, with an alternate Generation Marker. This
>>> uniquely defines which leader messages were acknowledged by.
>>> 
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 101+-+Alter+Replication+Protocol+to+use+Leader+
>> Generation+rather+than+High+Watermark+for+Truncation
>>> <
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 101+-+Alter+Replication+Protocol+to+use+Leader+
>> Generation+rather+than+High+Watermark+for+Truncation
 
>>> 
>>> All comments and suggestions greatly appreciated.
>>> 
>>> Ben Stopford
>>> Confluent, http://www.confluent.io 
>>> 
>>> --
>> Thanks,
>> Neha
>> 



Re: [DISCUSS] KIP-101: Alter Replication Protocol to use Leader Generation rather than High Watermark for Truncation

2016-12-14 Thread Ben Stopford
To clarify slightly, the case described in the KIP doesn’t necessitate an extra 
mapping to the CZXID. But there is an issue filed against the controller, which 
would also affect the LeaderGeneration correctness. The suggested fix for this 
includes such a mapping, according to Jun’s reasoning in the Jira comments: 
https://issues.apache.org/jira/browse/KAFKA-1120 
. Strictly speaking this is a 
separate issue though and I’ve updated the KIP accordingly. 

B
Ben Stopford
Confluent, http://www.confluent.io 



> On 14 Dec 2016, at 11:37, Ben Stopford  wrote:
> 
> Thanks Onur. That’s correct, we no longer nee that extra mapping. I’ll update 
> the KIP. 
> B
> 
> Ben Stopford
> Confluent, http://www.confluent.io 
> 
> 
> 
>> On 11 Dec 2016, at 23:54, Onur Karaman > > wrote:
>> 
>> Pretty happy to see a KIP tackling this problem! One comment below.
>> 
>> The "Extending LeaderEpoch to include Returning Leaders" states:
>> "To protect against this eventuality the controller will maintain a cached
>> mapping of [broker -> Zookeeper CZXID] (CZXID is a unique and monotonic
>> 64-bit number) for the broker’s registration in Zookeeper
>> (/brokers/ids/[brokerId]). If the controller receives a Broker Registration
>> where the CZXID has changed it will increment the Leader Epoch and
>> propagate that value to the broker via the Leader and ISR Request (in the
>> normal way), then update the cached CZXID for that broker."
>> 
>> In general I think kafka underutilizes zookeeper's various flavors of zxids
>> but this time it's not clear to me what the motivation is for maintaining
>> the broker to czxid mapping. It seems that the following check is
>> redundant: "If the controller receives a Broker Registration where the
>> CZXID has changed". By definition, the czxid of the /brokers/ids/[brokerId]
>> znode will always change upon successful broker registration (
>> https://zookeeper.apache.org/doc/r3.4.8/zookeeperProgrammers.html#sc_zkStatStructure
>>  
>> ).
>> Why maintain the mapping at all? Why not just always update leader epochs
>> and propagate every time the controller receives the broker registration zk
>> event?
>> 
>> On Sun, Dec 11, 2016 at 2:30 PM, Neha Narkhede > > wrote:
>> 
>>> Good to see this KIP being proposed. Back when I added the epoch to the
>>> replication protocol, we discussed adding it to the log due to the failure
>>> scenarios listed in the KIP but I failed to convince people that it was
>>> worth the effort needed to upgrade the cluster (especially after we asked
>>> people to go through a painful backwards incompatible upgrade for 0.8 :-))
>>> The lack of including the leader epoch/generation in the log has also been
>>> one of the biggest critiques of Kafka's replication protocol by the
>>> distributed systems community.
>>> 
>>> I'm in favor of this work though I think we shouldn't end up with 2 notions
>>> of representing a leader's generation. When we added the epoch, we wanted
>>> to add it to the log but we didn't. Now that we are adding the generation
>>> id to the log, I think we should revisit calling it the epoch at all. Have
>>> you thought about a way to evolve the epoch to the generation id throughout
>>> and what it will take?
>>> 
>>> On Sun, Dec 11, 2016 at 4:31 AM Ben Stopford >> > wrote:
>>> 
 Hi All
 
 Please find the below KIP which describes a proposed solution to a couple
 of issues that have been observed with the replication protocol.
 
 In short, the proposal replaces the use of the High Watermark, for
 follower log trunctation, with an alternate Generation Marker. This
 uniquely defines which leader messages were acknowledged by.
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP- 
 
>>> 101+-+Alter+Replication+Protocol+to+use+Leader+
>>> Generation+rather+than+High+Watermark+for+Truncation
 <
 https://cwiki.apache.org/confluence/display/KAFKA/KIP- 
 
>>> 101+-+Alter+Replication+Protocol+to+use+Leader+
>>> Generation+rather+than+High+Watermark+for+Truncation
> 
 
 All comments and suggestions greatly appreciated.
 
 Ben Stopford
 Confluent, http://www.confluent.io  
 >
 
 --
>>> Thanks,
>>> Neha
>>> 
> 



[GitHub] kafka pull request #2254: KAFKA-4537: StreamPartitionAssignor incorrectly ad...

2016-12-14 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2254

KAFKA-4537: StreamPartitionAssignor incorrectly adds standby partitions to 
the partitionsByHostState map

If a KafkaStreams app is using Standby Tasks then `StreamPartitionAssignor` 
will add the standby partitions to the partitionsByHostState map for each host. 
This is incorrect as the partitionHostState map is used to resolve which host 
is hosting a particular store for a key. 
The result is that doing metadata lookups for interactive queries can 
return an incorrect host

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka KAFKA-4537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2254


commit a29f74642cecb1722bbd785d42c2b0782d01c9d3
Author: Damian Guy 
Date:   2016-12-14T11:53:42Z

dont assign standby tasks to partitionsForHostState




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4537) StreamPartitionAssignor incorrectly adds standby partitions to the partitionsByHostState map

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748147#comment-15748147
 ] 

ASF GitHub Bot commented on KAFKA-4537:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2254

KAFKA-4537: StreamPartitionAssignor incorrectly adds standby partitions to 
the partitionsByHostState map

If a KafkaStreams app is using Standby Tasks then `StreamPartitionAssignor` 
will add the standby partitions to the partitionsByHostState map for each host. 
This is incorrect as the partitionHostState map is used to resolve which host 
is hosting a particular store for a key. 
The result is that doing metadata lookups for interactive queries can 
return an incorrect host

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka KAFKA-4537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2254


commit a29f74642cecb1722bbd785d42c2b0782d01c9d3
Author: Damian Guy 
Date:   2016-12-14T11:53:42Z

dont assign standby tasks to partitionsForHostState




> StreamPartitionAssignor incorrectly adds standby partitions to the 
> partitionsByHostState map
> 
>
> Key: KAFKA-4537
> URL: https://issues.apache.org/jira/browse/KAFKA-4537
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> If a KafkaStreams app is using Standby Tasks then StreamPartitionAssignor 
> will add the standby partitions to the partitionsByHostState map for each 
> host. This is incorrect as the partitionHostState map is used to resolve 
> which host is hosting a particular store for a key. 
> The result is that doing metadata lookups for interactive queries can return 
> an incorrect host



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-12-14 Thread Rajini Sivaram
It would clearly be very useful to enable clients to send requests on
behalf of multiple users. A separate KIP makes sense, but it may be worth
thinking through some of the implications now, especially if the main
interest in delegation tokens comes from its potential to enable
impersonation.

I understand that delegation tokens are only expected to be used with TLS.
But the choice of SASL/SCRAM for authentication must be based on a
requirement to protect the tokenHmac - otherwise you could just use
SASL/PLAIN. With SASL/SCRAM the tokenHmac is never propagated on-the-wire,
only a salted-hashed version of it is used in the SASL authentication
exchange. If impersonation is based on sending tokenHmac in requests, any
benefit of using SCRAM is lost.

An alternative may be to allow clients to authenticate multiple times using
SASL and include one of its authenticated principals in each request
(optionally). I haven't thought it through yet, obviously. But if the
approach is of interest and no one is working on a KIP for impersonation at
the moment, I am happy to write one. It may provide something for
comparison at least.

Thoughts?


On Wed, Dec 14, 2016 at 9:53 AM, Manikumar 
wrote:

> That's a good idea. Authenticating every request with delegation token will
> be useful for
> impersonation use-cases. But as of now, we are thinking delegation token as
> just another way
> to authenticate the users. We haven't think through all the use cases
> related to
> impersonation or using delegation token for impersonation. We want to
> handle impersonation
> (KAFKA-3712) as part of separate KIP.
>
> Will that be Ok?
>
>
> On Wed, Dec 14, 2016 at 8:09 AM, Gwen Shapira  wrote:
>
> > Thinking out loud here:
> >
> > It looks like authentication with a delegation token is going to be
> > super-cheap, right? We just compare the token to a value in the broker
> > cache?
> >
> > If I understood the KIP correctly, right now it suggests that
> > authentication happens when establishing the client-broker connection (as
> > normal for Kafka. But perhaps we want to consider authenticating every
> > request with delegation token (if exists)?
> >
> > So a centralized app can create few producers, do the metadata request
> and
> > broker discovery with its own user auth, but then use delegation tokens
> to
> > allow performing produce/fetch requests as different users? Instead of
> > having to re-connect for each impersonated user?
> >
> > This may over-complicate things quite a bit (basically adding extra
> > information in every request), but maybe it will be useful for
> > impersonation use-cases (which seem to drive much of the interest in this
> > KIP)?
> > Kafka Connect, NiFi and friends can probably use this to share clients
> > between multiple jobs, tasks, etc.
> >
> > What do you think?
> >
> > Gwen
> >
> > On Tue, Dec 13, 2016 at 12:43 AM, Manikumar 
> > wrote:
> >
> > > Ashish,
> > >
> > > Thank you for reviewing the KIP.  Please see the replies inline.
> > >
> > >
> > > > 1. How to disable delegation token authentication?
> > > >
> > > > This can be achieved in various ways, however I think reusing
> > delegation
> > > > token secret config for this makes sense here. Avoids creating yet
> > > another
> > > > config and forces delegation token users to consciously set the
> secret.
> > > If
> > > > the secret is not set or set to empty string, brokers should turn off
> > > > delegation token support. This will however require a new error code
> to
> > > > indicate delegation token support is turned off on broker.
> > > >
> > >
> > >   Thanks for the suggestion. Option to turnoff delegation token
> > > authentication will be useful.
> > >   I'll update the KIP.
> > >
> > >
> > > >
> > > > 2. ACLs on delegation token?
> > > >
> > > > Do we need to have ACLs defined for tokens? I do not think it buys us
> > > > anything, as delegation token can be treated as impersonation of the
> > > owner.
> > > > Any thing the owner has permission to do, delegation tokens should be
> > > > allowed to do as well. If so, we probably won't need to return
> > > > authorization exception error code while creating delegation token.
> It
> > > > however would make sense to check renew and expire requests are
> coming
> > > from
> > > > owner or renewers of the token, but that does not require explicit
> > acls.
> > > >
> > >
> > >
> > > Yes, We agreed to not have new acl on who can request delegation token.
> > >  I'll update the KIP.
> > >
> > >
> > > >
> > > > 3. How to restrict max life time of a token?
> > > >
> > > > Admins might want to restrict max life time of tokens created on a
> > > cluster,
> > > > and this can very from cluster to cluster based on use-cases. This
> > might
> > > > warrant a separate broker config.
> > > >
> > > >
> > > Currently we  have "delegation.token.max.lifetime.sec" server config
> > > property
> > > May be we can take min(User supplied MaxTime, Server MaxTime) as max
> life
> > > time.
> > > I am open to add

[jira] [Commented] (KAFKA-4522) Using Disruptor instead of Array Blocking queue in Kafka Producer

2016-12-14 Thread Ben Stopford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748169#comment-15748169
 ] 

Ben Stopford commented on KAFKA-4522:
-

It'd be good to have some empirical evidence that this was indeed a bottleneck 
in the wider flow.

> Using Disruptor instead of Array Blocking queue in Kafka Producer
> -
>
> Key: KAFKA-4522
> URL: https://issues.apache.org/jira/browse/KAFKA-4522
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Pratik kumar
>
> Kafka Producer currently uses Java's Array Blocking Queue to store outbound 
> kafka message before batching them in async mode. In case of high production 
> rate of kafka messages,this adds to lock contention on the user and is 
> generally hidden from user.(quoting from personal experience)
> Usage of LMAX Disruptor can reduce the lock contention overhead put by Kafka 
> Producer 
> LMAX Disruptor -> https://github.com/LMAX-Exchange/disruptor
> Also can someone help me understand if blocking queue gives any guarantees 
> inherent to kafka's design(and hence is irreplaceable)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4539) NPE in StreamThread when creating StandbyTasks

2016-12-14 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4539:
-

 Summary: NPE in StreamThread when creating StandbyTasks
 Key: KAFKA-4539
 URL: https://issues.apache.org/jira/browse/KAFKA-4539
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.10.2.0


Fails because {{createStandbyTask(..)}} can return null fi the topology for the 
{{TaskId}} doesn't have any state stores.

{code}
[2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
kafka-music-charts failed on partition assignment 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-4539) NPE in StreamThread when creating StandbyTasks

2016-12-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4539 started by Damian Guy.
-
> NPE in StreamThread when creating StandbyTasks
> --
>
> Key: KAFKA-4539
> URL: https://issues.apache.org/jira/browse/KAFKA-4539
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> Fails because {{createStandbyTask(..)}} can return null fi the topology for 
> the {{TaskId}} doesn't have any state stores.
> {code}
> [2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> kafka-music-charts failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-12-14 Thread Rajini Sivaram
Edo,

I wouldn't introduce a new config entry, especially since you don't need it
after KAFKA-4137. As a temporary measure that would work for consumers. But
you probably don't want to do the same for brokers - will be worth checking
with Radai since the implementation will be based on KIP-72. To do this
only for consumers, you will need some conditions in the common network
code while allocating and releasing buffers. A bit messy, but doable.



On Wed, Dec 14, 2016 at 11:32 AM, Edoardo Comar  wrote:

> Thanks Rajini,
> Before Kafka-4137, we could avoid coordinator starvation without making a
> special case for a special connection,
> but rather simply, in applying the buffer.memory check only to 'large'
> responses
> (e.g.  size > 1k, possibly introducing a new config entry) in
>
> NetworkReceive.readFromReadableChannel(ReadableByteChannel)
>
> Essentially this would limit reading fetch responses but allow for other
> responses to be processed.
>
> This is a sample of sizes for responses I collected :
>
> * size=108 APIKEY=3 METADATA
> *  size=28 APIKEY=10 GROUP_COORDINATOR
> *  size=193 APIKEY=11 JOIN_GROUP
> *  size=39 APIKEY=14 SYNC_GROUP
> *  size=39 APIKEY=9 OFFSET_FETCH
> *  size=45 APIKEY=2 LIST_OFFSETS
> *  size=88926 APIKEY=1 FETCH
> *  size=45 APIKEY=1 FETCH
> *  size=6 APIKEY=12 HEARTBEAT
> *  size=45 APIKEY=1 FETCH
> *  size=45 APIKEY=1 FETCH
> *  size=45 APIKEY=1 FETCH
> *  size=6 APIKEY=12 HEARTBEAT
> *  size=45 APIKEY=1 FETCH
> *  size=45 APIKEY=1 FETCH
>
> What do you think?
> --
> Edoardo Comar
> IBM MessageHub
> eco...@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
>
>
>
> From:   Rajini Sivaram 
> To: dev@kafka.apache.org
> Date:   13/12/2016 17:27
> Subject:Re: [DISCUSS] KIP-81: Max in-flight fetches
>
>
>
> Coordinator starvation: For an implementation based on KIP-72, there will
> be coordinator starvation without KAFKA-4137 since you would stop reading
> from sockets when the memory pool is full (the fact that coordinator
> messages are small doesn't help). I imagine you can work around this by
> treating coordinator connections as special connections but that spills
> over to common network code. Separate NetworkClient for coordinator
> proposed in KAFKA-4137 would be much better.
>
> On Tue, Dec 13, 2016 at 3:47 PM, Mickael Maison 
> wrote:
>
> > Thanks for all the feedback.
> >
> > I've updated the KIP with all the details.
> > Below are a few of the main points:
> >
> > - Overall memory usage of the consumer:
> > I made it clear the memory pool is only used to store the raw bytes
> > from the network and that the decompressed/deserialized messages are
> > not stored in it but as extra memory on the heap. In addition, the
> > consumer also keeps track of other things (in flight requests,
> > subscriptions, etc..) that account for extra memory as well. So this
> > is not a hard bound memory constraint but should still allow to
> > roughly size how much memory can be used.
> >
> > - Relation with the existing settings:
> > There are already 2 settings that deal with memory usage of the
> > consumer. I suggest we lower the priority of
> > `max.partition.fetch.bytes` (I wonder if we should attempt to
> > deprecate it or increase its default value so it's a contraint less
> > likely to be hit) and have the new setting `buffer.memory` as High.
> > I'm a bit unsure what's the best default value for `buffer.memory`, I
> > suggested 100MB in the KIP (2 x `fetch.max.bytes`), but I'd appreciate
> > feedback. It should always at least be equal to `max.fetch.bytes`.
> >
> > - Configuration name `buffer.memory`:
> > I think it's the name that makes the most sense. It's aligned with the
> > producer and as mentioned generic enough to allow future changes if
> > needed.
> >
> > - Coordination starvation:
> > Yes this is a potential issue. I'd expect these requests to be small
> > enough to not be affected too much. If that's the case KAFKA-4137
> > suggests a possible fix.
> >
> >
> >
> > On Tue, Dec 13, 2016 at 9:31 AM, Ismael Juma  wrote:
> > > Makes sense Jay.
> > >
> > > Mickael, in addition to how we can compute defaults of the other
> settings
> > > from `buffer.memory`, it would be good to specify what is allowed and
> how
> > > we handle the different cases (e.g. what do we do if
> > > `max.partition.fetch.bytes`
> > > is greater than `buffer.memory`, is that simply not allowed?).
> > >
> > > To summarise the gap between the ideal scenario (user specifies how
> much
> > > memory the consumer can use) and what is being proposed:
> > >
> > > 1. We will decompress and deserialize the data for one or more
> partitions
> > > in order to return them to the user and we don't account for the
> > increased
> > > memory us

[GitHub] kafka pull request #2255: KAFKA-4539: NPE in StreamThread when creating Stan...

2016-12-14 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2255

KAFKA-4539: NPE in StreamThread when creating StandbyTasks

Tasks that don't have any `StateStore`s wont have a `StandbyTask`, so 
`createStandbyTask` can return `null`. We need to check for this in 
`StandbyTaskCreator.createTask(...)`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4539

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2255.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2255


commit aa1e6b7ce213ccb6019b7f16e9148cad00cf66f9
Author: Damian Guy 
Date:   2016-12-14T13:08:40Z

fix NPE when creating standby tasks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4539) NPE in StreamThread when creating StandbyTasks

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748306#comment-15748306
 ] 

ASF GitHub Bot commented on KAFKA-4539:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2255

KAFKA-4539: NPE in StreamThread when creating StandbyTasks

Tasks that don't have any `StateStore`s wont have a `StandbyTask`, so 
`createStandbyTask` can return `null`. We need to check for this in 
`StandbyTaskCreator.createTask(...)`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4539

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2255.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2255


commit aa1e6b7ce213ccb6019b7f16e9148cad00cf66f9
Author: Damian Guy 
Date:   2016-12-14T13:08:40Z

fix NPE when creating standby tasks




> NPE in StreamThread when creating StandbyTasks
> --
>
> Key: KAFKA-4539
> URL: https://issues.apache.org/jira/browse/KAFKA-4539
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> Fails because {{createStandbyTask(..)}} can return null fi the topology for 
> the {{TaskId}} doesn't have any state stores.
> {code}
> [2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> kafka-music-charts failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4539) NPE in StreamThread when creating StandbyTasks

2016-12-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4539:
--
Status: Patch Available  (was: In Progress)

> NPE in StreamThread when creating StandbyTasks
> --
>
> Key: KAFKA-4539
> URL: https://issues.apache.org/jira/browse/KAFKA-4539
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> Fails because {{createStandbyTask(..)}} can return null fi the topology for 
> the {{TaskId}} doesn't have any state stores.
> {code}
> [2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> kafka-music-charts failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4538) Version

2016-12-14 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748324#comment-15748324
 ] 

Rajini Sivaram commented on KAFKA-4538:
---

If you are using KafkaConsumer, this could be due to KAFKA-4431.

> Version 
> 
>
> Key: KAFKA-4538
> URL: https://issues.apache.org/jira/browse/KAFKA-4538
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Unix OS, Scala
>Reporter: Samuel Durand
>Priority: Minor
>
> The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from 
> closing by itself. This was not the case with the previous 0.10.0.1
> To close the app you now have to add an explicit system exit.
> {code:title=Bar.scala|borderStyle=solid}
> object App extends App {
>  doSomethingThatCreatesAndUsesAKafkaClient()
>   // Necessary to close the application
>   System.exit(0)
> }
> {code}
> I didn't find out yet if that's because of some leaking process or something 
> else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4536) Kafka clients throw NullPointerException on poll when delete the relative topic

2016-12-14 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748359#comment-15748359
 ] 

Rajini Sivaram commented on KAFKA-4536:
---

This is fixed in 0.10.1.0.

> Kafka clients throw NullPointerException on poll when delete the relative 
> topic
> ---
>
> Key: KAFKA-4536
> URL: https://issues.apache.org/jira/browse/KAFKA-4536
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: mayi_hetu
>
> 1. new KafkaConsumer
>   val groupIdString = "test1"  
>   val props = new Properties();
>   props.put("bootstrap.servers", "99.12.143.240:9093");
>   props.put("group.id", groupIdString);
>   props.put("enable.auto.commit", "false");
>   props.put("auto.offset.reset","earliest");
>   props.put("auto.commit.interval.ms", "5000");
>   props.put("metadata.max.age.ms","30");
>   props.put("session.timeout.ms", "3");
> props.setProperty("key.deserializer", 
> classOf[ByteArrayDeserializer].getName)
> props.setProperty("value.deserializer", 
> classOf[ByteArrayDeserializer].getName)
>   props.setProperty("client.id", groupIdString)
>   new KafkaConsumer[Array[Byte], Array[Byte]](props)
> 2. *subscribe topic through Partten*
> consumer.subscribe(Pattern.compile(".*\\.sh$"), consumerRebalanceListener)
> 3. use poll(1000) fetching messages
> 4. delete topic test1.sh in Kafka broker
> then the consumer throw NullPointerException
> {color:red}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.ArrayList.addAll(Unknown Source)
>   at 
> org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
>   at org.apache.kafka.clients.Metadata.update(Metadata.java:185)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:606)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:583)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>   at 
> TestNewConsumer$MyMirrorMakerNewConsumer.receive(TestNewConsumer.scala:146)
>   at TestNewConsumer$.main(TestNewConsumer.scala:38)
>   at TestNewConsumer.main(TestNewConsumer.scala)
> {color}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2253: KAFKA-4534: StreamPartitionAssignor only ever upda...

2016-12-14 Thread dguy
Github user dguy closed the pull request at:

https://github.com/apache/kafka/pull/2253


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4534) StreamPartitionAssignor only ever updates the partitionsByHostState and metadataWithInternalTopics once.

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748376#comment-15748376
 ] 

ASF GitHub Bot commented on KAFKA-4534:
---

Github user dguy closed the pull request at:

https://github.com/apache/kafka/pull/2253


> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once.
> 
>
> Key: KAFKA-4534
> URL: https://issues.apache.org/jira/browse/KAFKA-4534
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once. This results in incorrect metadata on 
> rebalances.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4534) StreamPartitionAssignor only ever updates the partitionsByHostState and metadataWithInternalTopics once.

2016-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748380#comment-15748380
 ] 

ASF GitHub Bot commented on KAFKA-4534:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2256

KAFKA-4534: StreamPartitionAssignor only ever updates the 
partitionsByHostState and metadataWithInternalTopics on first assignment

partitionsByHostState and metadataWithInternalTopics need to be updated on 
each call to onAssignment() otherwise they contain invalid/stale metadata.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka 4534

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2256


commit 3437ff3a8883d22bce09aca14161f3d958c3e2ac
Author: Damian Guy 
Date:   2016-12-14T13:50:55Z

update partitionHostState and metadata onAssignment




> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once.
> 
>
> Key: KAFKA-4534
> URL: https://issues.apache.org/jira/browse/KAFKA-4534
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> StreamPartitionAssignor only ever updates the partitionsByHostState and 
> metadataWithInternalTopics once. This results in incorrect metadata on 
> rebalances.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2256: KAFKA-4534: StreamPartitionAssignor only ever upda...

2016-12-14 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2256

KAFKA-4534: StreamPartitionAssignor only ever updates the 
partitionsByHostState and metadataWithInternalTopics on first assignment

partitionsByHostState and metadataWithInternalTopics need to be updated on 
each call to onAssignment() otherwise they contain invalid/stale metadata.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka 4534

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2256


commit 3437ff3a8883d22bce09aca14161f3d958c3e2ac
Author: Damian Guy 
Date:   2016-12-14T13:50:55Z

update partitionHostState and metadata onAssignment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-12-14 Thread Ismael Juma
Hi Rajini,

I think it would definitely be valuable to have a KIP for impersonation.

Ismael

On Wed, Dec 14, 2016 at 4:03 AM, Rajini Sivaram  wrote:

> It would clearly be very useful to enable clients to send requests on
> behalf of multiple users. A separate KIP makes sense, but it may be worth
> thinking through some of the implications now, especially if the main
> interest in delegation tokens comes from its potential to enable
> impersonation.
>
> I understand that delegation tokens are only expected to be used with TLS.
> But the choice of SASL/SCRAM for authentication must be based on a
> requirement to protect the tokenHmac - otherwise you could just use
> SASL/PLAIN. With SASL/SCRAM the tokenHmac is never propagated on-the-wire,
> only a salted-hashed version of it is used in the SASL authentication
> exchange. If impersonation is based on sending tokenHmac in requests, any
> benefit of using SCRAM is lost.
>
> An alternative may be to allow clients to authenticate multiple times using
> SASL and include one of its authenticated principals in each request
> (optionally). I haven't thought it through yet, obviously. But if the
> approach is of interest and no one is working on a KIP for impersonation at
> the moment, I am happy to write one. It may provide something for
> comparison at least.
>
> Thoughts?
>
>
> On Wed, Dec 14, 2016 at 9:53 AM, Manikumar 
> wrote:
>
> > That's a good idea. Authenticating every request with delegation token
> will
> > be useful for
> > impersonation use-cases. But as of now, we are thinking delegation token
> as
> > just another way
> > to authenticate the users. We haven't think through all the use cases
> > related to
> > impersonation or using delegation token for impersonation. We want to
> > handle impersonation
> > (KAFKA-3712) as part of separate KIP.
> >
> > Will that be Ok?
> >
> >
> > On Wed, Dec 14, 2016 at 8:09 AM, Gwen Shapira  wrote:
> >
> > > Thinking out loud here:
> > >
> > > It looks like authentication with a delegation token is going to be
> > > super-cheap, right? We just compare the token to a value in the broker
> > > cache?
> > >
> > > If I understood the KIP correctly, right now it suggests that
> > > authentication happens when establishing the client-broker connection
> (as
> > > normal for Kafka. But perhaps we want to consider authenticating every
> > > request with delegation token (if exists)?
> > >
> > > So a centralized app can create few producers, do the metadata request
> > and
> > > broker discovery with its own user auth, but then use delegation tokens
> > to
> > > allow performing produce/fetch requests as different users? Instead of
> > > having to re-connect for each impersonated user?
> > >
> > > This may over-complicate things quite a bit (basically adding extra
> > > information in every request), but maybe it will be useful for
> > > impersonation use-cases (which seem to drive much of the interest in
> this
> > > KIP)?
> > > Kafka Connect, NiFi and friends can probably use this to share clients
> > > between multiple jobs, tasks, etc.
> > >
> > > What do you think?
> > >
> > > Gwen
> > >
> > > On Tue, Dec 13, 2016 at 12:43 AM, Manikumar  >
> > > wrote:
> > >
> > > > Ashish,
> > > >
> > > > Thank you for reviewing the KIP.  Please see the replies inline.
> > > >
> > > >
> > > > > 1. How to disable delegation token authentication?
> > > > >
> > > > > This can be achieved in various ways, however I think reusing
> > > delegation
> > > > > token secret config for this makes sense here. Avoids creating yet
> > > > another
> > > > > config and forces delegation token users to consciously set the
> > secret.
> > > > If
> > > > > the secret is not set or set to empty string, brokers should turn
> off
> > > > > delegation token support. This will however require a new error
> code
> > to
> > > > > indicate delegation token support is turned off on broker.
> > > > >
> > > >
> > > >   Thanks for the suggestion. Option to turnoff delegation token
> > > > authentication will be useful.
> > > >   I'll update the KIP.
> > > >
> > > >
> > > > >
> > > > > 2. ACLs on delegation token?
> > > > >
> > > > > Do we need to have ACLs defined for tokens? I do not think it buys
> us
> > > > > anything, as delegation token can be treated as impersonation of
> the
> > > > owner.
> > > > > Any thing the owner has permission to do, delegation tokens should
> be
> > > > > allowed to do as well. If so, we probably won't need to return
> > > > > authorization exception error code while creating delegation token.
> > It
> > > > > however would make sense to check renew and expire requests are
> > coming
> > > > from
> > > > > owner or renewers of the token, but that does not require explicit
> > > acls.
> > > > >
> > > >
> > > >
> > > > Yes, We agreed to not have new acl on who can request delegation
> token.
> > > >  I'll update the KIP.
> > > >
> > > >
> > > > >
> > > > > 3. How to restrict max life time of a token?
> > > > >
> > > > > Admins might w

Re: [DISCUSS] KIP-83 - Allow multiple SASL PLAIN authenticated Java clients in a single JVM process

2016-12-14 Thread Ismael Juma
Hi Edoardo,

Yes, you're right, this can be just a JIRA now as there are no publicly
facing changes.

Thanks,
Ismael

On Tue, Dec 13, 2016 at 9:07 AM, Edoardo Comar  wrote:

> Thanks for your review, Ismael.
>
> First, I am no longer sure KIP-83 is worth keeping as KIP, I created it
> just before Rajini's
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+
> configuration+for+Kafka+clients
> With KIP-85 as presented, my proposal has become a simple JIRA, there are
> no interface changes on top of KIP-85.
> So I'll have no objection if you want to retire it as part of your
> cleanup.
>
> As for your comments :
> 1) We can change the map to use the Password object as a key in the
> LoginManager cache, so logging its content won't leak the key.
> Though I can't see why we would log the content of the cache.
>
> 2) If two clients use the same Jaas Config value, they will obtain the
> same LoginManager.
> No new concurrency issue would arise as this happens today with any two
> clients (Producers/Consumers) in the same process.
>
> 3) Based on most jaas.config samples I have seen for kerberos and
> sasl/plain, the text used as key should be no larger than 0.5k.
>
> Please let us know of any other concerns you may have, as
> IBM Message Hub is very eager to have the issue
> https://issues.apache.org/jira/browse/KAFKA-4180 merged in the next
> release (February timeframe 0.10.2 ? 0.11 ?).
> so we're waiting for Rajini's
> https://issues.apache.org/jira/browse/KAFKA-4259 on which our changes are
> based.
>
> thanks
> Edo
> --
> Edoardo Comar
> IBM MessageHub
> eco...@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
>
>
>
> From:   Ismael Juma 
> To: dev@kafka.apache.org
> Date:   13/12/2016 12:49
> Subject:Re: [DISCUSS] KIP-83 - Allow multiple SASL PLAIN
> authenticated Java clients in a single JVM process
> Sent by:isma...@gmail.com
>
>
>
> Thanks for the KIP. A few comments:
>
> 1. The suggestion is to use the JAAS config value as the key to the map in
> `LoginManager`. The config value can include passwords, so we could
> potentially end up leaking them if we log the keys of `LoginManager`. This
> seems a bit dangerous.
>
> 2. If someone uses the same JAAS config value in two clients, they'll get
> the same `JaasConfig`, which seems fine, but worth mentioning (it means
> that the `JaasConfig` has to be thread-safe).
>
> 3. How big can a JAAS config get? Is it an issue to use it as a map key?
> Probably not given how this is used, but worth covering in the KIP as
> well.
>
> Ismael
>
> On Tue, Sep 27, 2016 at 10:15 AM, Edoardo Comar  wrote:
>
> > Hi,
> > I had a go at a KIP that addresses this JIRA
> > https://issues.apache.org/jira/browse/KAFKA-4180
> > "Shared authentification with multiple actives Kafka
> producers/consumers"
> >
> > which is a limitation of the current Java client that we (IBM
> MessageHub)
> > get asked quite often lately.
> >
> > We will have a go at a PR soon, just as a proof of concept, but as it
> > introduces new public interfaces it needs a KIP.
> >
> > I'll welcome your input.
> >
> > Edo
> > --
> > Edoardo Comar
> > MQ Cloud Technologies
> > eco...@uk.ibm.com
> > +44 (0)1962 81 5576
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> > IBM United Kingdom Limited Registered in England and Wales with number
> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > 3AU
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>


[jira] [Commented] (KAFKA-4538) Version

2016-12-14 Thread Samuel Durand (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748422#comment-15748422
 ] 

Samuel Durand commented on KAFKA-4538:
--

[~rsivaram] it might , if I don't find the time to investigate before the next 
release (is there a date planned for that ?) I will update that issue 

> Version 
> 
>
> Key: KAFKA-4538
> URL: https://issues.apache.org/jira/browse/KAFKA-4538
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Unix OS, Scala
>Reporter: Samuel Durand
>Priority: Minor
>
> The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from 
> closing by itself. This was not the case with the previous 0.10.0.1
> To close the app you now have to add an explicit system exit.
> {code:title=Bar.scala|borderStyle=solid}
> object App extends App {
>  doSomethingThatCreatesAndUsesAKafkaClient()
>   // Necessary to close the application
>   System.exit(0)
> }
> {code}
> I didn't find out yet if that's because of some leaking process or something 
> else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4538) Version

2016-12-14 Thread Samuel Durand (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748422#comment-15748422
 ] 

Samuel Durand edited comment on KAFKA-4538 at 12/14/16 2:13 PM:


[~rsivaram] it might , if I don't find the time to investigate before the next 
release (is there a date planned for that ?) I will update that issue (a fix 
for that other issue should be in the realease)


was (Author: firens):
[~rsivaram] it might , if I don't find the time to investigate before the next 
release (is there a date planned for that ?) I will update that issue 

> Version 
> 
>
> Key: KAFKA-4538
> URL: https://issues.apache.org/jira/browse/KAFKA-4538
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Unix OS, Scala
>Reporter: Samuel Durand
>Priority: Minor
>
> The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from 
> closing by itself. This was not the case with the previous 0.10.0.1
> To close the app you now have to add an explicit system exit.
> {code:title=Bar.scala|borderStyle=solid}
> object App extends App {
>  doSomethingThatCreatesAndUsesAKafkaClient()
>   // Necessary to close the application
>   System.exit(0)
> }
> {code}
> I didn't find out yet if that's because of some leaking process or something 
> else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4538) Version

2016-12-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4538.

Resolution: Duplicate

Duplicate of KAFKA-4431, please reopen if not.

> Version 
> 
>
> Key: KAFKA-4538
> URL: https://issues.apache.org/jira/browse/KAFKA-4538
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Unix OS, Scala
>Reporter: Samuel Durand
>Priority: Minor
>
> The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from 
> closing by itself. This was not the case with the previous 0.10.0.1
> To close the app you now have to add an explicit system exit.
> {code:title=Bar.scala|borderStyle=solid}
> object App extends App {
>  doSomethingThatCreatesAndUsesAKafkaClient()
>   // Necessary to close the application
>   System.exit(0)
> }
> {code}
> I didn't find out yet if that's because of some leaking process or something 
> else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4431) HeartbeatThread should be a daemon thread

2016-12-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4431:
---
Fix Version/s: 0.10.1.1

> HeartbeatThread should be a daemon thread
> -
>
> Key: KAFKA-4431
> URL: https://issues.apache.org/jira/browse/KAFKA-4431
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: David Judd
>Assignee: Rajini Sivaram
> Fix For: 0.10.1.1, 0.10.2.0
>
>
> We're seeing an issue where an exception inside the main processing loop of a 
> consumer doesn't cause the JVM to exit, as expected (and, in our case, 
> desired). From the thread dump, it appears that what's blocking exit is the 
> "kafka-coordinator-heartbeat-thread", which is not currently a daemon thread. 
> Per the mailing list, it sounds like this is a bug.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4431) HeartbeatThread should be a daemon thread

2016-12-14 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748569#comment-15748569
 ] 

Ismael Juma commented on KAFKA-4431:


I cherry-picked this to the 0.10.1 branch as it's a trivial fix and has user 
visible impact.

> HeartbeatThread should be a daemon thread
> -
>
> Key: KAFKA-4431
> URL: https://issues.apache.org/jira/browse/KAFKA-4431
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: David Judd
>Assignee: Rajini Sivaram
> Fix For: 0.10.1.1, 0.10.2.0
>
>
> We're seeing an issue where an exception inside the main processing loop of a 
> consumer doesn't cause the JVM to exit, as expected (and, in our case, 
> desired). From the thread dump, it appears that what's blocking exit is the 
> "kafka-coordinator-heartbeat-thread", which is not currently a daemon thread. 
> Per the mailing list, it sounds like this is a bug.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4538) Version

2016-12-14 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748574#comment-15748574
 ] 

Ismael Juma commented on KAFKA-4538:


I cherry-picked KAFKA-4431 into the 0.10.1 branch so it will be part of the 
next bug fix release (we're planning to do the next RC soon).

> Version 
> 
>
> Key: KAFKA-4538
> URL: https://issues.apache.org/jira/browse/KAFKA-4538
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Unix OS, Scala
>Reporter: Samuel Durand
>Priority: Minor
>
> The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from 
> closing by itself. This was not the case with the previous 0.10.0.1
> To close the app you now have to add an explicit system exit.
> {code:title=Bar.scala|borderStyle=solid}
> object App extends App {
>  doSomethingThatCreatesAndUsesAKafkaClient()
>   // Necessary to close the application
>   System.exit(0)
> }
> {code}
> I didn't find out yet if that's because of some leaking process or something 
> else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4474) Poor kafka-streams throughput

2016-12-14 Thread Juan Chorro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748575#comment-15748575
 ] 

Juan Chorro commented on KAFKA-4474:


Hi again!

We have been doing more performance tests and we have observed an anomaly 
behavior. In link below you can see all gotten information:

https://docs.google.com/spreadsheets/d/1iVKIp3vGCZKSlByaaEhM5hiA3cNA75zzNGdMNeIMO68/edit?usp=sharing

In all cases each one service has its own node. We have gotten the consumer and 
producer throughtput by JMX protocol with jconsole tool.

We have a synthetic producer that injects ~100K messages per second to 
kafka-streams application and we can seei next cases:

* Case A: We have 1 zookeeper server, 1 kafka-broker and 1 kafka-streams app. 
Also we have two topics, input and output, with 4 partitions each one. We have 
a synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~20K messages per second and in output topic the 
app is producing to ~4K messages per second. Where is others ~16K messages per 
second whether I don't observe a excesive RAM increment?

* Case B: We have 1 zookeeper server, 2 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 2 partitions each one. We have a 
synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~100K messages per second and in output topic 
the app is producing to ~100K messages per second. This case is correct!

* Case C: We have 1 zookeeper server, 2 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 4 partitions each one. We have a 
synthetic producer that injects ~100K nessages per second in input topic, the 
kafka-streams app is consuming ~20K messages per second and in output topic the 
app is producing to ~4K messages per second. This case is the same that Case A 
but with different #kafka-brokers.

* Case D: We have 1 zookeeper server, 4 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 4 partitions each one. We have a 
synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~820K messages per second and in output topic 
the app is producing to ~100K messages per second. In this case as synthetic 
producer as kafka-streams producer have the same throughput but the 
kafka-streams consumer gets ~820K messages per second and I don't know why.

I don't understand why from Case B to Case C the consumer's throughput is lower 
when increase #partitions.

Do you like same like me?
Do I have some wrong concepts?

If you need anything else, feel free asking me for it.

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>Assignee: Eno Thereska
> Attachments: hctop sreenshot.png
>
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 an

[jira] [Comment Edited] (KAFKA-4474) Poor kafka-streams throughput

2016-12-14 Thread Juan Chorro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748575#comment-15748575
 ] 

Juan Chorro edited comment on KAFKA-4474 at 12/14/16 3:17 PM:
--

Hi again!

We have been doing more performance tests and we have observed an anomaly 
behavior. In link below you can see all gotten information:

https://docs.google.com/spreadsheets/d/1Ywj2nlKoBdu6fX_fkPeY7ebBqXtEo79x9uyIhyZVB7A/edit?usp=sharing

In all cases each one service has its own node. We have gotten the consumer and 
producer throughtput by JMX protocol with jconsole tool.

We have a synthetic producer that injects ~100K messages per second to 
kafka-streams application and we can seei next cases:

* Case A: We have 1 zookeeper server, 1 kafka-broker and 1 kafka-streams app. 
Also we have two topics, input and output, with 4 partitions each one. We have 
a synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~20K messages per second and in output topic the 
app is producing to ~4K messages per second. Where is others ~16K messages per 
second whether I don't observe a excesive RAM increment?

* Case B: We have 1 zookeeper server, 2 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 2 partitions each one. We have a 
synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~100K messages per second and in output topic 
the app is producing to ~100K messages per second. This case is correct!

* Case C: We have 1 zookeeper server, 2 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 4 partitions each one. We have a 
synthetic producer that injects ~100K nessages per second in input topic, the 
kafka-streams app is consuming ~20K messages per second and in output topic the 
app is producing to ~4K messages per second. This case is the same that Case A 
but with different #kafka-brokers.

* Case D: We have 1 zookeeper server, 4 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 4 partitions each one. We have a 
synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~820K messages per second and in output topic 
the app is producing to ~100K messages per second. In this case as synthetic 
producer as kafka-streams producer have the same throughput but the 
kafka-streams consumer gets ~820K messages per second and I don't know why.

I don't understand why from Case B to Case C the consumer's throughput is lower 
when increase #partitions.

Do you like same like me?
Do I have some wrong concepts?

If you need anything else, feel free asking me for it.


was (Author: jjchorrobe):
Hi again!

We have been doing more performance tests and we have observed an anomaly 
behavior. In link below you can see all gotten information:

https://docs.google.com/spreadsheets/d/1iVKIp3vGCZKSlByaaEhM5hiA3cNA75zzNGdMNeIMO68/edit?usp=sharing

In all cases each one service has its own node. We have gotten the consumer and 
producer throughtput by JMX protocol with jconsole tool.

We have a synthetic producer that injects ~100K messages per second to 
kafka-streams application and we can seei next cases:

* Case A: We have 1 zookeeper server, 1 kafka-broker and 1 kafka-streams app. 
Also we have two topics, input and output, with 4 partitions each one. We have 
a synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~20K messages per second and in output topic the 
app is producing to ~4K messages per second. Where is others ~16K messages per 
second whether I don't observe a excesive RAM increment?

* Case B: We have 1 zookeeper server, 2 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 2 partitions each one. We have a 
synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~100K messages per second and in output topic 
the app is producing to ~100K messages per second. This case is correct!

* Case C: We have 1 zookeeper server, 2 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 4 partitions each one. We have a 
synthetic producer that injects ~100K nessages per second in input topic, the 
kafka-streams app is consuming ~20K messages per second and in output topic the 
app is producing to ~4K messages per second. This case is the same that Case A 
but with different #kafka-brokers.

* Case D: We have 1 zookeeper server, 4 kafka-brokers and 1 kafka-streams app. 
We have two topics, input and output, with 4 partitions each one. We have a 
synthetic producer that injects ~100K messages per second in input topic, the 
kafka-streams app is consuming ~820K messages per second and in output topic 
the ap

Re: [DISCUSS] KIP-86: Configurable SASL callback handlers

2016-12-14 Thread Rajini Sivaram
Have added sample callback handlers for PLAIN and SCRAM.

On Tue, Dec 13, 2016 at 4:10 PM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Ismael,
>
> Thank you for the review. I will add an example.
>
> On Tue, Dec 13, 2016 at 1:07 PM, Ismael Juma  wrote:
>
>> Hi Rajini,
>>
>> Thanks for the KIP. I think this is useful and users have asked for
>> something like that. I like that you have a scenarios section, do you
>> think
>> you could provide a rough sketch of what a callback handler would look
>> like
>> for the first 2 scenarios? They seem to be the common ones, so it would
>> help to see a concrete example.
>>
>> Ismael
>>
>> On Tue, Oct 11, 2016 at 7:28 AM, Rajini Sivaram <
>> rajinisiva...@googlemail.com> wrote:
>>
>> > Hi all,
>> >
>> > I have just created KIP-86 make callback handlers in SASL configurable
>> so
>> > that credential providers for SASL/PLAIN (and SASL/SCRAM when it is
>> > implemented) can be used with custom credential callbacks:
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 86%3A+Configurable+SASL+callback+handlers
>> >
>> > Comments and suggestions are welcome.
>> >
>> > Thank you...
>> >
>> >
>> > Regards,
>> >
>> > Rajini
>> >
>>
>
>
>
> --
> Regards,
>
> Rajini
>



-- 
Regards,

Rajini


log.retention attribute not working

2016-12-14 Thread Ghosh, Achintya (Contractor)
Hi there,

Any idea why log.retention attribute is not working?  We kept 
log.retention.hours=6 in server.properties but we see old data are not getting 
deleted. We see Dec 9th data/log files are still there.
We are running this in production boxes and if it does not delete the old files 
our storage will be full very soon. Please help on this.

Here is the details of our configuration:


# The minimum age of a log file to be eligible for deletion
log.retention.hours=6

# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted 
according
# to the retention policies
log.retention.check.interval.ms=30

Thanks
Achintya



Re: [DISCUSS] KIP-86: Configurable SASL callback handlers

2016-12-14 Thread Ismael Juma
Thanks Rajini, that helps. A few comments:

1. The `AuthCallbackHandler` interface already exists and we are making
breaking changes (removing a parameter from `configure` and adding
additional methods). Is the reasoning that it was not a public interface
before? It would be good to clearly separate public versus non-public
interfaces in the security code (and we should tweak Gradle to publish
javadoc for the public ones).

2. It seems like there is an ordering when it comes to the invocation of
callbacks. At least the current code assumes that `NameCallback` is called
first. If I am interpreting this correctly, we should specify that ordering.

3. The approach taken by `ScramCredentialCallback` is different than the
one taken by `PlainAuthenticateCallback`. The former lets the user pass the
credentials information while the latter passes the credentials and lets
the user do the authentication. It would be good to explain the
inconsistency.

4. We reference `ScramCredential` in a few places, so it would be good to
define that class too.

Ismael

On Wed, Dec 14, 2016 at 7:32 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Have added sample callback handlers for PLAIN and SCRAM.
>
> On Tue, Dec 13, 2016 at 4:10 PM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Ismael,
> >
> > Thank you for the review. I will add an example.
> >
> > On Tue, Dec 13, 2016 at 1:07 PM, Ismael Juma  wrote:
> >
> >> Hi Rajini,
> >>
> >> Thanks for the KIP. I think this is useful and users have asked for
> >> something like that. I like that you have a scenarios section, do you
> >> think
> >> you could provide a rough sketch of what a callback handler would look
> >> like
> >> for the first 2 scenarios? They seem to be the common ones, so it would
> >> help to see a concrete example.
> >>
> >> Ismael
> >>
> >> On Tue, Oct 11, 2016 at 7:28 AM, Rajini Sivaram <
> >> rajinisiva...@googlemail.com> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I have just created KIP-86 make callback handlers in SASL configurable
> >> so
> >> > that credential providers for SASL/PLAIN (and SASL/SCRAM when it is
> >> > implemented) can be used with custom credential callbacks:
> >> >
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 86%3A+Configurable+SASL+callback+handlers
> >> >
> >> > Comments and suggestions are welcome.
> >> >
> >> > Thank you...
> >> >
> >> >
> >> > Regards,
> >> >
> >> > Rajini
> >> >
> >>
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
> >
>
>
>
> --
> Regards,
>
> Rajini
>


[jira] [Updated] (KAFKA-4539) StreamThread is not correctly creating StandbyTasks

2016-12-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4539:
--
Summary: StreamThread is not correctly creating  StandbyTasks  (was: NPE in 
StreamThread when creating StandbyTasks)

> StreamThread is not correctly creating  StandbyTasks
> 
>
> Key: KAFKA-4539
> URL: https://issues.apache.org/jira/browse/KAFKA-4539
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> Fails because {{createStandbyTask(..)}} can return null fi the topology for 
> the {{TaskId}} doesn't have any state stores.
> {code}
> [2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> kafka-music-charts failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4539) StreamThread is not correctly creating StandbyTasks

2016-12-14 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4539:
--
Description: 
Fails because {{createStandbyTask(..)}} can return null fi the topology for the 
{{TaskId}} doesn't have any state stores.

{code}
[2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
kafka-music-charts failed on partition assignment 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
{code}

Also fails because the checkpointedOffsets from the newly created 
{{StandbyTask}} aren't added to the offsets map, so the partitions don't get 
assigned. We then get:


  was:
Fails because {{createStandbyTask(..)}} can return null fi the topology for the 
{{TaskId}} doesn't have any state stores.

{code}
[2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
kafka-music-charts failed on partition assignment 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:1241)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1188)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:915)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$400(StreamThread.java:72)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:239)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1022)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:987)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:568)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:357)
{code}


> StreamThread is not correctly creating  StandbyTasks
> 
>
> Key: KAFKA-4539
> URL: https://issues.apache.org/jira/browse/KAFKA-4539
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> Fails because {{createStandbyTask(..)}} can return null fi the topology for 
> the {{TaskId}} doesn't have any state stores.
> {code}
> [2016-12-14 12:20:29,758] ERROR [StreamThread-1] User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> kafka-music-charts failed on partition assignment 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> java.lang.NullP

New question / request on kafka consumer

2016-12-14 Thread Costache, Vlad
Hello,

We are trying to make a consumer for kafka, (client code alone and camel 
integrated) and we ended in a blocking point.
Can you please give us an advice, or any other idea?

Our problem:

-  We create a kafka consumer that connects to a wrong server (wrong 
ip/port), and the consumer get stuck in "poll" method even the connection is 
not created.

-  We tried also with camel, but the same problem (as the same kafka 
client is called)

-  It seems that there is a bug in kafka java client that the 
connection ends up in a loop

-  For producer everything is fine.

Do you have any advice, or can you confirm that there is a bug? Do you plan to 
fix this? Our production code will be with camel, so there will be needed an 
exception to be thrown so we can make our data error handling.

How can we make the consumer throw exception if the connection to server is not 
created successfully or the connection is lost at some point?


Our environment:
Standalone:

  
  org.apache.kafka
  kafka_2.11
  0.10.0.0
  

With camel:
  
 org.apache.camel
 camel-kafka
 2.17.0.redhat-630187
  


The dummy code example is attached.

Or camel:

   //server do not exist

from("kafka:10.97.210.222:8093?topic=testTopic_mip133&groupId=testing&autoOffsetReset=earliest&consumersCount=1").process(new
 Processor(
) {

@Override
public void process(Exchange exchange) throws Exception {
LOG.info(new MsbHeaderImpl(), "Am primit header: " + 
exchange.getIn().getHeaders());
LOG.info(new MsbHeaderImpl(), "Am primit body: " + 
exchange.getIn().getBody());

}
});


Kafka server version: kafka_2.10-0.10.1.0


Thanks,
Vlad

Geschaeftsanschrift/Business Address: METRO SYSTEMS GmbH, Metro-Strasse 12, 
40235 Duesseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschaeftsfuehrung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim 
van Herwijnen
Sitz Duesseldorf, Amtsgericht Duesseldorf, HRB 18232/Registered Office 
Duesseldorf, Commercial Register of the Duesseldorf Local Court, HRB 18232

---
Betreffend Mails von *@metrosystems.net

Die in dieser E-Mail enthaltenen Nachrichten und Anhaenge sind ausschliesslich 
fuer den bezeichneten Adressaten bestimmt.
Sie koennen rechtlich geschuetzte, vertrauliche Informationen enthalten. Falls 
Sie nicht der bezeichnete Empfaenger oder zum Empfang dieser E-Mail nicht 
berechtigt sind, ist die Verwendung, Vervielfaeltigung oder Weitergabe der 
Nachrichten und Anhaenge untersagt. Falls Sie diese E-Mail irrtuemlich erhalten 
haben, informieren Sie bitte unverzueglich den Absender und vernichten Sie die 
E-Mail.

Regarding mails from *@metrosystems.net

This e-mail message and any attachment are intended exclusively for the named 
addressee.
They may contain confidential information which may also be protected by 
professional secrecy. Unless you are the named addressee (or authorised to 
receive for the addressee) you may not copy or use this message or any 
attachment or disclose the contents to anyone else. If this e-mail was sent to 
you by mistake please notify the sender immediately and delete this e-mail.


[jira] [Commented] (KAFKA-4474) Poor kafka-streams throughput

2016-12-14 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748741#comment-15748741
 ] 

Eno Thereska commented on KAFKA-4474:
-

[~jjchorrobe] can I confirm again that this is all running in one machine? So 
when you say 2 brokers, they are both running on the same machine right? Also 
can I confirm that you have an HDD and not and SSD? Thanks.

> Poor kafka-streams throughput
> -
>
> Key: KAFKA-4474
> URL: https://issues.apache.org/jira/browse/KAFKA-4474
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Juan Chorro
>Assignee: Eno Thereska
> Attachments: hctop sreenshot.png
>
>
> Hi! 
> I'm writing because I have a worry about kafka-streams throughput.
> I have only a kafka-streams application instance that consumes from 'input' 
> topic, prints on the screen and produces in 'output' topic. All topics have 4 
> partitions. As can be observed the topology is very simple.
> I produce 120K messages/second to 'input' topic, when I measure the 'output' 
> topic I detect that I'm receiving ~4K messages/second. I had next 
> configuration (Remaining parameters by default):
> application.id: myApp
> bootstrap.servers: localhost:9092
> zookeeper.connect: localhost:2181
> num.stream.threads: 1
> I was doing proofs and tests without success, but when I created a new 
> 'input' topic with 1 partition (Maintain 'output' topic with 4 partitions) I 
> got in 'output' topic 120K messages/seconds.
> I have been doing some performance tests and proof with next cases (All 
> topics have 4 partitions in all cases):
> Case A - 1 Instance:
> - With num.stream.threads set to 1 I had ~3785 messages/second
> - With num.stream.threads set to 2 I had ~3938 messages/second
> - With num.stream.threads set to 4 I had ~120K messages/second
> Case B - 2 Instances:
> - With num.stream.threads set to 1 I had ~3930 messages/second for each 
> instance (And throughput ~8K messages/second)
> - With num.stream.threads set to 2 I had ~3945 messages/second for each 
> instance (And more or less same throughput that with num.stream.threads set 
> to 1)
> Case C - 4 Instances
> - With num.stream.threads set to 1 I had 3946 messages/seconds for each 
> instance (And throughput ~17K messages/second):
> As can be observed when num.stream.threads is set to #partitions I have best 
> results. Then I have next questions:
> - Why whether I have a topic with #partitions > 1 and with 
> num.streams.threads is set to 1 I have ~4K messages/second always?
> - In case C. 4 instances with num.stream.threads set to 1 should be better 
> that 1 instance with num.stream.threads set to 4. Is corrects this 
> supposition?
> This is the kafka-streams application that I use: 
> https://gist.github.com/Chorro/5522ec4acd1a005eb8c9663da86f5a18



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4538) Version

2016-12-14 Thread Samuel Durand (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748743#comment-15748743
 ] 

Samuel Durand commented on KAFKA-4538:
--

thanks for the info [~ijuma] will follow that closely

> Version 
> 
>
> Key: KAFKA-4538
> URL: https://issues.apache.org/jira/browse/KAFKA-4538
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
> Environment: Unix OS, Scala
>Reporter: Samuel Durand
>Priority: Minor
>
> The new KafkaClient for kafka 0.10.1.0 prevents a simple Scala app from 
> closing by itself. This was not the case with the previous 0.10.0.1
> To close the app you now have to add an explicit system exit.
> {code:title=Bar.scala|borderStyle=solid}
> object App extends App {
>  doSomethingThatCreatesAndUsesAKafkaClient()
>   // Necessary to close the application
>   System.exit(0)
> }
> {code}
> I didn't find out yet if that's because of some leaking process or something 
> else.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4180) Shared authentication with multiple active Kafka producers/consumers

2016-12-14 Thread Edoardo Comar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748758#comment-15748758
 ] 

Edoardo Comar commented on KAFKA-4180:
--

I have updated the PR based on the discussion in the mailing list - namely the 
key used to cache `LoginManager` instances.
I have retired KIP-83, the PR that closes this issue is mostly based on KIP-85 
and associated https://issues.apache.org/jira/browse/KAFKA-4259

> Shared authentication with multiple active Kafka producers/consumers
> 
>
> Key: KAFKA-4180
> URL: https://issues.apache.org/jira/browse/KAFKA-4180
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , security
>Affects Versions: 0.10.0.1
>Reporter: Guillaume Grossetie
>Assignee: Mickael Maison
>  Labels: authentication, jaas, loginmodule, plain, producer, 
> sasl, user
>
> I'm using Kafka 0.10.0.1 with an SASL authentication on the client:
> {code:title=kafka_client_jaas.conf|borderStyle=solid}
> KafkaClient {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="guillaume"
> password="secret";
> };
> {code}
> When using multiple Kafka producers the authentification is shared [1]. In 
> other words it's not currently possible to have multiple Kafka producers in a 
> JVM process.
> Am I missing something ? How can I have multiple active Kafka producers with 
> different credentials ?
> My use case is that I have an application that send messages to multiples 
> clusters (one cluster for logs, one cluster for metrics, one cluster for 
> business data).
> [1] 
> https://github.com/apache/kafka/blob/69ebf6f7be2fc0e471ebd5b7a166468017ff2651/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java#L35



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-83 - Allow multiple SASL PLAIN authenticated Java clients in a single JVM process

2016-12-14 Thread Edoardo Comar
Hi Ismael, I have retired the KIP and updated the PR. 
Can't wait for https://issues.apache.org/jira/browse/KAFKA-4259 to be 
merged :-)
--
Edoardo Comar
IBM MessageHub
eco...@uk.ibm.com
IBM UK Ltd, Hursley Park, SO21 2JN

IBM United Kingdom Limited Registered in England and Wales with number 
741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 
3AU



From:   Ismael Juma 
To: dev@kafka.apache.org
Date:   14/12/2016 14:01
Subject:Re: [DISCUSS] KIP-83 - Allow multiple SASL PLAIN 
authenticated Java clients in a single JVM process
Sent by:isma...@gmail.com



Hi Edoardo,

Yes, you're right, this can be just a JIRA now as there are no publicly
facing changes.

Thanks,
Ismael

On Tue, Dec 13, 2016 at 9:07 AM, Edoardo Comar  wrote:

> Thanks for your review, Ismael.
>
> First, I am no longer sure KIP-83 is worth keeping as KIP, I created it
> just before Rajini's
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+
> configuration+for+Kafka+clients
> With KIP-85 as presented, my proposal has become a simple JIRA, there 
are
> no interface changes on top of KIP-85.
> So I'll have no objection if you want to retire it as part of your
> cleanup.
>
> As for your comments :
> 1) We can change the map to use the Password object as a key in the
> LoginManager cache, so logging its content won't leak the key.
> Though I can't see why we would log the content of the cache.
>
> 2) If two clients use the same Jaas Config value, they will obtain the
> same LoginManager.
> No new concurrency issue would arise as this happens today with any two
> clients (Producers/Consumers) in the same process.
>
> 3) Based on most jaas.config samples I have seen for kerberos and
> sasl/plain, the text used as key should be no larger than 0.5k.
>
> Please let us know of any other concerns you may have, as
> IBM Message Hub is very eager to have the issue
> https://issues.apache.org/jira/browse/KAFKA-4180 merged in the next
> release (February timeframe 0.10.2 ? 0.11 ?).
> so we're waiting for Rajini's
> https://issues.apache.org/jira/browse/KAFKA-4259 on which our changes 
are
> based.
>
> thanks
> Edo
> --
> Edoardo Comar
> IBM MessageHub
> eco...@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. 
PO6
> 3AU
>
>
>
> From:   Ismael Juma 
> To: dev@kafka.apache.org
> Date:   13/12/2016 12:49
> Subject:Re: [DISCUSS] KIP-83 - Allow multiple SASL PLAIN
> authenticated Java clients in a single JVM process
> Sent by:isma...@gmail.com
>
>
>
> Thanks for the KIP. A few comments:
>
> 1. The suggestion is to use the JAAS config value as the key to the map 
in
> `LoginManager`. The config value can include passwords, so we could
> potentially end up leaking them if we log the keys of `LoginManager`. 
This
> seems a bit dangerous.
>
> 2. If someone uses the same JAAS config value in two clients, they'll 
get
> the same `JaasConfig`, which seems fine, but worth mentioning (it means
> that the `JaasConfig` has to be thread-safe).
>
> 3. How big can a JAAS config get? Is it an issue to use it as a map key?
> Probably not given how this is used, but worth covering in the KIP as
> well.
>
> Ismael
>
> On Tue, Sep 27, 2016 at 10:15 AM, Edoardo Comar  
wrote:
>
> > Hi,
> > I had a go at a KIP that addresses this JIRA
> > https://issues.apache.org/jira/browse/KAFKA-4180
> > "Shared authentification with multiple actives Kafka
> producers/consumers"
> >
> > which is a limitation of the current Java client that we (IBM
> MessageHub)
> > get asked quite often lately.
> >
> > We will have a go at a PR soon, just as a proof of concept, but as it
> > introduces new public interfaces it needs a KIP.
> >
> > I'll welcome your input.
> >
> > Edo
> > --
> > Edoardo Comar
> > MQ Cloud Technologies
> > eco...@uk.ibm.com
> > +44 (0)1962 81 5576
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> > IBM United Kingdom Limited Registered in England and Wales with number
> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > 3AU
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with 
number
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 
3AU
>



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


[jira] [Commented] (KAFKA-4487) Tests should be run in Jenkins with INFO or DEBUG level

2016-12-14 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748788#comment-15748788
 ] 

Colin P. McCabe commented on KAFKA-4487:


Good idea.  +1 for Jenkins running tests at a higher log level

> Tests should be run in Jenkins with INFO or DEBUG level
> ---
>
> Key: KAFKA-4487
> URL: https://issues.apache.org/jira/browse/KAFKA-4487
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
> Fix For: 0.10.2.0
>
>
> KAFKA-4483 is an example of what can be missed by running them at ERROR 
> level. Worse than that would be subtle issues that would escape detection 
> altogether.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka on Ubuntu Core and Snap store

2016-12-14 Thread Colin McCabe
Hi Michael,

"snapcraft" sounds like an interesting idea.  I'm curious what you see
as the advantage over Docker images.  Does the snap file contain the
Java runtime?  Kafka, like most JDK projects, already bundles its
library dependencies internally, so I don't think snapcraft is adding
quite as much value there as it might for a C/C++ application.

Is there a Maven or Gradle plugin that could generate this automatically
from the install tar.gz file the project already maintains?

best,
Colin


On Tue, Dec 6, 2016, at 06:04, Michael Hall wrote:
> Hello all,
> 
> My name is Michael Hall, I work at Canonical on the community team, and
> recently I've been working with the snapcraft[1] developers to help
> upstreams learn about and start using our new packaging format.
> 
> Yesterday I spent a couple of hours looking at Kafka and building an
> example .snap package for it. It now works, at least for the basics, and
> I was able to follow the quickstart guide[2] after installing it. I'd
> like to make this a contribution to Kafka, and get some developer help
> to finish it up.
> 
> For anybody who wants to try it, you can download it[3] and install it
> with "sudo snap install --dangerous kafka_0.10.1_amd64.snap". It will
> automatically start zookeeper and kafka-server as systemd services, and
> makes the command "/snap/bin/kafka.topics" available to call from the
> command line.
> 
> I have attached the snapcraft.yaml used to build the snap, if you're on
> Ubuntu 16.04 or later (or run it in a VM/docker) you can build it
> yourself by calling "snapcraft" from the same directory as this file.
> 
> [1] http://snapcraft.io/
> [2] https://kafka.apache.org/quickstart
> [3] http://people.ubuntu.com/~mhall119/snaps/kafka_0.10.1_amd64.snap
> 
> -- 
> Michael Hall
> mhall...@gmail.com
> Email had 1 attachment:
> + snapcraft.yaml
>   2k (application/x-yaml)


[jira] [Created] (KAFKA-4540) Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created

2016-12-14 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4540:
-

 Summary: Suspended tasks that are not assigned to the StreamThread 
need to be closed before new active and standby tasks are created
 Key: KAFKA-4540
 URL: https://issues.apache.org/jira/browse/KAFKA-4540
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Damian Guy
Assignee: Damian Guy
 Fix For: 0.10.2.0


When partition assignment happens we first try and add the active tasks and 
then add the standby tasks. The problem with this is that a new active task 
might already be an existing suspended standby task. if this is the case then 
when the active task initialises it will throw an exception from RocksDB:
{{Caused by: org.rocksdb.RocksDBException: IO error: lock 
/tmp/kafka-streams-7071/kafka-music-charts/1_1/rocksdb/all-songs/LOCK: No locks 
available}}

We need to make sure we have removed an closed any no-longer assigned Suspended 
tasks before creating new tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-48 Support for delegation tokens as an authentication mechanism

2016-12-14 Thread Harsha Chintalapani
@Gwen @Mani  Not sure why we want to authenticate at every request. Even if
the token exchange is cheap it still a few calls that need to go through
round trip.  Impersonation doesn't require authentication for every
request.

"So a centralized app can create few producers, do the metadata request and
broker discovery with its own user auth, but then use delegation tokens to
allow performing produce/fetch requests as different users? Instead of
having to re-connect for each impersonated user?"

Yes. But what we will have is this centralized user as impersonation user
on behalf of other users. When it authenticates initially we will create a
"Subject" and from there on wards centralized user can do
Subject.doAsPrivileged
on behalf, other users.
On the server side, we can retrieve two principals out of this one is the
authenticated user (centralized user) and another is impersonated user. We
will first check if the authenticated user allowed to impersonate and then
move on to check if the user Alice has access to the topic "X" to
read/write.

@Rajini Intention of this KIP is to support token auth via SASL/SCRAM, not
just with TLS.  What you raised is a good point let me take a look and add
details.

It will be easier to add impersonation once we reach agreement on this KIP.


On Wed, Dec 14, 2016 at 5:51 AM Ismael Juma  wrote:

> Hi Rajini,
>
> I think it would definitely be valuable to have a KIP for impersonation.
>
> Ismael
>
> On Wed, Dec 14, 2016 at 4:03 AM, Rajini Sivaram 
> wrote:
>
> > It would clearly be very useful to enable clients to send requests on
> > behalf of multiple users. A separate KIP makes sense, but it may be worth
> > thinking through some of the implications now, especially if the main
> > interest in delegation tokens comes from its potential to enable
> > impersonation.
> >
> > I understand that delegation tokens are only expected to be used with
> TLS.
> > But the choice of SASL/SCRAM for authentication must be based on a
> > requirement to protect the tokenHmac - otherwise you could just use
> > SASL/PLAIN. With SASL/SCRAM the tokenHmac is never propagated
> on-the-wire,
> > only a salted-hashed version of it is used in the SASL authentication
> > exchange. If impersonation is based on sending tokenHmac in requests, any
> > benefit of using SCRAM is lost.
> >
> > An alternative may be to allow clients to authenticate multiple times
> using
> > SASL and include one of its authenticated principals in each request
> > (optionally). I haven't thought it through yet, obviously. But if the
> > approach is of interest and no one is working on a KIP for impersonation
> at
> > the moment, I am happy to write one. It may provide something for
> > comparison at least.
> >
> > Thoughts?
> >
> >
> > On Wed, Dec 14, 2016 at 9:53 AM, Manikumar 
> > wrote:
> >
> > > That's a good idea. Authenticating every request with delegation token
> > will
> > > be useful for
> > > impersonation use-cases. But as of now, we are thinking delegation
> token
> > as
> > > just another way
> > > to authenticate the users. We haven't think through all the use cases
> > > related to
> > > impersonation or using delegation token for impersonation. We want to
> > > handle impersonation
> > > (KAFKA-3712) as part of separate KIP.
> > >
> > > Will that be Ok?
> > >
> > >
> > > On Wed, Dec 14, 2016 at 8:09 AM, Gwen Shapira 
> wrote:
> > >
> > > > Thinking out loud here:
> > > >
> > > > It looks like authentication with a delegation token is going to be
> > > > super-cheap, right? We just compare the token to a value in the
> broker
> > > > cache?
> > > >
> > > > If I understood the KIP correctly, right now it suggests that
> > > > authentication happens when establishing the client-broker connection
> > (as
> > > > normal for Kafka. But perhaps we want to consider authenticating
> every
> > > > request with delegation token (if exists)?
> > > >
> > > > So a centralized app can create few producers, do the metadata
> request
> > > and
> > > > broker discovery with its own user auth, but then use delegation
> tokens
> > > to
> > > > allow performing produce/fetch requests as different users? Instead
> of
> > > > having to re-connect for each impersonated user?
> > > >
> > > > This may over-complicate things quite a bit (basically adding extra
> > > > information in every request), but maybe it will be useful for
> > > > impersonation use-cases (which seem to drive much of the interest in
> > this
> > > > KIP)?
> > > > Kafka Connect, NiFi and friends can probably use this to share
> clients
> > > > between multiple jobs, tasks, etc.
> > > >
> > > > What do you think?
> > > >
> > > > Gwen
> > > >
> > > > On Tue, Dec 13, 2016 at 12:43 AM, Manikumar <
> manikumar.re...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Ashish,
> > > > >
> > > > > Thank you for reviewing the KIP.  Please see the replies inline.
> > > > >
> > > > >
> > > > > > 1. How to disable delegation token authentication?
> > > > > >
> > > > > > This can b

Build failed in Jenkins: kafka-0.10.1-jdk7 #107

2016-12-14 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-4431; Make consumer heartbeat thread a daemon thread

--
[...truncated 14289 lines...]
org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest > 
shouldThrowInvalidStoreExceptionOnRangeDuringRebalance PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest > 
shouldSupportRangeAcrossMultipleKVStores STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest > 
shouldSupportRangeAcrossMultipleKVStores PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest > 
shouldSupportAllAcrossMultipleStores STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest > 
shouldSupportAllAcrossMultipleStores PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldNotGetValuesFromOtherStores STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldNotGetValuesFromOtherStores PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldReturnEmptyIteratorIfNoData STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldReturnEmptyIteratorIfNoData PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldThrowInvalidStateStoreExceptionIfFetchThrows STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldThrowInvalidStateStoreExceptionIfFetchThrows PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldFindValueForKeyWhenMultiStores STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldFindValueForKeyWhenMultiStores PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldThrowInvalidStateStoreExceptionOnRebalance STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldThrowInvalidStateStoreExceptionOnRebalance PASSED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldFetchValuesFromWindowStore STARTED

org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest > 
shouldFetchValuesFromWindowStore PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldPeekNext STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldPeekNext PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldPeekAndIterate STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldPeekAndIterate PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingWindowIteratorTest > 
shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled PASSED

org.apache.kafka.streams.state.internals.StoreChangeLoggerTest > testAddRemove 
STARTED

org.apache.kafka.streams.state.internals.StoreChangeLoggerTest > testAddRemove 
PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldPeekNext STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldPeekNext PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldPeekAndIterate STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldPeekAndIterate PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled PASSED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled STARTED

org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIteratorTest 
> shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testEvict 
STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testEvict 
PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testSize 
STARTED

org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreTest > testSize 
PASSED

org.apache.kafka.streams.state.internals.InMemoryLRUC

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-14 Thread radai
arent control messages getting pushed as their own top level protocol
change (and a fairly massive one) for the transactions KIP ?

On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax 
wrote:

> Hi,
>
> I want to add a completely new angle to this discussion. For this, I
> want to propose an extension for the headers feature that enables new
> uses cases -- and those new use cases might convince people to support
> headers (of course including the larger scoped proposal).
>
> Extended Proposal:
>
> Allow messages with a certain header key to be special "control
> messages" (w/ o w/o payload) that are not exposed to an application via
> .poll().
>
> Thus, a consumer client would automatically skip over those messages. If
> an application knows about embedded control messages, it can "sing up"
> to those messages by the consumer client and either get a callback or
> the consumer auto-drop for this messages gets disabled (allowing to
> consumer those messages via poll()).
>
> (The details need further considerations/discussion. I just want to
> sketch the main idea.)
>
> Usage:
>
> There is a shared topic (ie, used by multiple applications) and a
> producer application wants to embed a special message in the topic for a
> dedicated consumer application. Because only one application will
> understand this message, it cannot be a regular message as this would
> break all applications that do not understand this message. The producer
> application would set a special metadata key and no consumer application
> would see this control message by default because they did not enable
> their consumer client to return this message in poll() (and the client
> would just drop this message with special metadata key). Only the single
> application that should receive this message, will subscribe to this
> message on its consumer client and process it.
>
>
> Concrete Use Case: Kafka Streams
>
> In Kafka Streams, we would like to propagate "control messages" from
> subtopology to subtopology. There are multiple scenarios for which this
> would be useful. For example, currently we do not guarantee a
> "consistent shutdown" of an application. By this, I mean that input
> records might not be completely processed by the whole topology because
> the application shutdown happens "in between" and an intermediate result
> topic gets "stock" in an intermediate topic. Thus, a user would see an
> committed offset of the source topic of the application, but no
> corresponding result record in the output topic.
>
> Having "shutdown markers" would allow us, to first stop the upstream
> subtopology and write this marker into the intermediate topic and the
> downstream subtopology would only shut down itself after is sees the
> "shutdown marker". Thus, we can guarantee on shutdown, that no
> "in-flight" messages got stuck in intermediate topics.
>
>
> A similar usage would be for KIP-95 (Incremental Batch Processing).
> There was a discussion about the proposed metadata topic, and we could
> avoid this metadata topic if we would have "control messages".
>
>
> Right now, we cannot insert an "application control message" because
> Kafka Streams does not own all topics it read/writes and thus might
> break other consumer application (as described above) if we inject
> random messages that are not understood by other apps.
>
>
> Of course, one can work around "embedded control messaged" by using an
> additional topic to propagate control messaged between application (as
> suggestion in KIP-95 via a metadata topic for Kafka Streams). But there
> are major concerns about adding this metadata topic in the KIP and this
> shows that other application that need a similar pattern might profit
> from topic embedded "control messages", too.
>
>
> One last important consideration: those "control messages" are used for
> client to client communication and are not understood by the broker.
> Thus, those messages should not be enabled within the message format
> (c.f. tombstone flag -- KIP-87). However, "client land" record headers
> would be a nice way to implement them. Because KIP-82 did consider key
> namespaces for metatdata keys, this extension should not be an own KIP
> but should be included in KIP-82 to reserve a namespace for "control
> message" in the first place.
>
>
> Sorry for the long email... Looking forward to your feedback.
>
>
> -Matthias
>
>
>
>
>
>
>
>
>
> On 12/8/16 12:12 AM, Michael Pearce wrote:
> > Hi Jun
> >
> > 100) each time a transaction exits a jvm for a remote system (HTTP/JMS/
> Hopefully one day kafka) the APM tools stich in a unique id (though I
> believe it contains the end2end uuid embedded in this id), on receiving the
> message at the receiving JVM the apm code takes this out, and continues its
> tracing on the that new thread. Both JVM’s (and other languages the APM
> tool supports) send this data async back to the central controllers where
> the stiching togeather occurs. For this they need some header space for
> them 

[jira] [Commented] (KAFKA-4529) tombstone may be removed earlier than it should

2016-12-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748966#comment-15748966
 ] 

Jun Rao commented on KAFKA-4529:


Committed [~becket_qin]'s patch to 0.10.1 branch. Leaving the jira open until 
trunk is fixed.

> tombstone may be removed earlier than it should
> ---
>
> Key: KAFKA-4529
> URL: https://issues.apache.org/jira/browse/KAFKA-4529
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Jun Rao
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.1
>
>
> As part of KIP-33, we introduced a regression on how tombstone is removed in 
> a compacted topic. We want to delay the removal of a tombstone to avoid the 
> case that a reader first reads a non-tombstone message on a key and then 
> doesn't see the tombstone for the key because it's deleted too quickly. So, a 
> tombstone is supposed to only be removed from a compacted topic after the 
> tombstone is part of the cleaned portion of the log after delete.retention.ms.
> Before KIP-33, deleteHorizonMs in LogCleaner is calculated based on the last 
> modified time, which is monotonically increasing from old to new segments. 
> With KIP-33, deleteHorizonMs is calculated based on the message timestamp, 
> which is not necessarily monotonically increasing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-12-14 Thread Matthias J. Sax
Yes and no. I did overload the term "control message".

EOS control messages are for client-broker communication and thus never
exposed to any application. And I think this is a good design because
broker needs to understand those control messages. Thus, this should be
a protocol change.

The type of control messages I have in mind are for client-client
(application-application) communication and the broker is agnostic to
them. Thus, it should not be a protocol change.


-Matthias



On 12/14/16 9:42 AM, radai wrote:
> arent control messages getting pushed as their own top level protocol
> change (and a fairly massive one) for the transactions KIP ?
> 
> On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax 
> wrote:
> 
>> Hi,
>>
>> I want to add a completely new angle to this discussion. For this, I
>> want to propose an extension for the headers feature that enables new
>> uses cases -- and those new use cases might convince people to support
>> headers (of course including the larger scoped proposal).
>>
>> Extended Proposal:
>>
>> Allow messages with a certain header key to be special "control
>> messages" (w/ o w/o payload) that are not exposed to an application via
>> .poll().
>>
>> Thus, a consumer client would automatically skip over those messages. If
>> an application knows about embedded control messages, it can "sing up"
>> to those messages by the consumer client and either get a callback or
>> the consumer auto-drop for this messages gets disabled (allowing to
>> consumer those messages via poll()).
>>
>> (The details need further considerations/discussion. I just want to
>> sketch the main idea.)
>>
>> Usage:
>>
>> There is a shared topic (ie, used by multiple applications) and a
>> producer application wants to embed a special message in the topic for a
>> dedicated consumer application. Because only one application will
>> understand this message, it cannot be a regular message as this would
>> break all applications that do not understand this message. The producer
>> application would set a special metadata key and no consumer application
>> would see this control message by default because they did not enable
>> their consumer client to return this message in poll() (and the client
>> would just drop this message with special metadata key). Only the single
>> application that should receive this message, will subscribe to this
>> message on its consumer client and process it.
>>
>>
>> Concrete Use Case: Kafka Streams
>>
>> In Kafka Streams, we would like to propagate "control messages" from
>> subtopology to subtopology. There are multiple scenarios for which this
>> would be useful. For example, currently we do not guarantee a
>> "consistent shutdown" of an application. By this, I mean that input
>> records might not be completely processed by the whole topology because
>> the application shutdown happens "in between" and an intermediate result
>> topic gets "stock" in an intermediate topic. Thus, a user would see an
>> committed offset of the source topic of the application, but no
>> corresponding result record in the output topic.
>>
>> Having "shutdown markers" would allow us, to first stop the upstream
>> subtopology and write this marker into the intermediate topic and the
>> downstream subtopology would only shut down itself after is sees the
>> "shutdown marker". Thus, we can guarantee on shutdown, that no
>> "in-flight" messages got stuck in intermediate topics.
>>
>>
>> A similar usage would be for KIP-95 (Incremental Batch Processing).
>> There was a discussion about the proposed metadata topic, and we could
>> avoid this metadata topic if we would have "control messages".
>>
>>
>> Right now, we cannot insert an "application control message" because
>> Kafka Streams does not own all topics it read/writes and thus might
>> break other consumer application (as described above) if we inject
>> random messages that are not understood by other apps.
>>
>>
>> Of course, one can work around "embedded control messaged" by using an
>> additional topic to propagate control messaged between application (as
>> suggestion in KIP-95 via a metadata topic for Kafka Streams). But there
>> are major concerns about adding this metadata topic in the KIP and this
>> shows that other application that need a similar pattern might profit
>> from topic embedded "control messages", too.
>>
>>
>> One last important consideration: those "control messages" are used for
>> client to client communication and are not understood by the broker.
>> Thus, those messages should not be enabled within the message format
>> (c.f. tombstone flag -- KIP-87). However, "client land" record headers
>> would be a nice way to implement them. Because KIP-82 did consider key
>> namespaces for metatdata keys, this extension should not be an own KIP
>> but should be included in KIP-82 to reserve a namespace for "control
>> message" in the first place.
>>
>>
>> Sorry for the long email... Looking forward to your feedback.
>>
>>
>>

[jira] [Assigned] (KAFKA-3714) Allow users greater access to register custom streams metrics

2016-12-14 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3714:
---

Assignee: Eno Thereska  (was: Guozhang Wang)

> Allow users greater access to register custom streams metrics
> -
>
> Key: KAFKA-3714
> URL: https://issues.apache.org/jira/browse/KAFKA-3714
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Eno Thereska
>Priority: Minor
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Copying in some discussion that originally appeared in 
> https://github.com/apache/kafka/pull/1362#issuecomment-219064302
> Kafka Streams is largely a higher-level abstraction on top of producers and 
> consumers, and it seems sensible to match the KafkaStreams interface to that 
> of KafkaProducer and KafkaConsumer where possible. For producers and 
> consumers, the metric registry is internal and metrics are only exposed as an 
> unmodifiable map. This allows users to access client metric values for use in 
> application health checks, etc., but doesn't allow them to register new 
> metrics.
> That approach seems reasonable if we assume that a user interested in 
> defining custom metrics is already going to be using a separate metrics 
> library. In such a case, users will likely find it easier to define metrics 
> using whatever library they're familiar with rather than learning the API for 
> Kafka's Metrics class. Is this a reasonable assumption?
> If we want to expose the Metrics instance so that users can define arbitrary 
> metrics, I'd argue that there's need for documentation updates. In 
> particular, I find the notion of metric tags confusing. Tags can be defined 
> in a MetricConfig when the Metrics instance is constructed, 
> StreamsMetricsImpl is maintaining its own set of tags, and users can set tag 
> overrides.
> If a user were to get access to the Metrics instance, they would be missing 
> the tags defined in StreamsMetricsImpl. I'm imagining that users would want 
> their custom metrics to sit alongside the predefined metrics with the same 
> tags, and users shouldn't be expected to manage those additional tags 
> themselves.
> So, why are we allowing users to define their own metrics via the 
> StreamsMetrics interface in the first place? Is it that we'd like to be able 
> to provide a built-in latency metric, but the definition depends on the 
> details of the use case so there's no generic solution? That would be 
> sufficient motivation for this special case of addLatencySensor. If we want 
> to continue down that path and give users access to define a wider range of 
> custom metrics, I'd prefer to extend the StreamsMetrics interface so that 
> users can call methods on that object, automatically getting the tags 
> appropriate for that instance rather than interacting with the raw Metrics 
> instance.
> ---
> Guozhang had the following comments:
> 1) For the producer/consumer cases, all internal metrics are provided and 
> abstracted from users, and they just need to read the documentation to poll 
> whatever provided metrics that they are interested; and if they want to 
> define more metrics, they are likely to be outside the clients themselves and 
> they can use whatever methods they like, so Metrics do not need to be exposed 
> to users.
> 2) For streams, things are a bit different: users define the computational 
> logic, which becomes part of the "Streams Client" processing and may be of 
> interests to be monitored by user themselves; think of a customized processor 
> that sends an email to some address based on a condition, and users want to 
> monitor the average rate of emails sent. Hence it is worth considering 
> whether or not they should be able to access the Metrics instance to define 
> their own along side the pre-defined metrics provided by the library.
> 3) Now, since the Metrics class was not previously designed for public usage, 
> it is not designed to be very user-friendly for defining sensors, especially 
> the semantics differences between name / scope / tags. StreamsMetrics tries 
> to hide some of these semantics confusion from users, but it still expose 
> tags and hence is not perfect in doing so. We need to think of a better 
> approach so that: 1) user defined metrics will be "aligned" (i.e. with the 
> same name prefix within a single application, with similar scope hierarchy 
> definition, etc) with library provided metrics, 2) natural APIs to do so.
> I do not have concrete ideas about 3) above on top of my head, comments are 
> more than welcomed.
> ---
> I'm not sure that I agree that 1) and 2) are truly different situations. A 
> user might choose to send email messages within a bare consumer rather than a 

[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2016-12-14 Thread Tom DeVoe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749077#comment-15749077
 ] 

Tom DeVoe commented on KAFKA-4477:
--

Regarding 2 - I restarted the host which shrank all of it's ISRs at 21:30.
Regarding 3/4 - We will definitely be willing to try 0.10.1.1 when it is 
released, however we don't have an environment that we would be willing to test 
the patched version.



> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce (IG)
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Attachments: issue_node_1001.log, issue_node_1001_ext.log, 
> issue_node_1002.log, issue_node_1002_ext.log, issue_node_1003.log, 
> issue_node_1003_ext.log, kafka.jstack, state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-101: Alter Replication Protocol to use Leader Generation rather than High Watermark for Truncation

2016-12-14 Thread Jun Rao
Hi, Onur,

The reason for keeping track of the CZXID of the broker registration path
is the following. There is one corner case bug (KAFKA-1120) that Ben
mentioned where the controller could miss a ZK watcher event if the broker
deregisters and registers quickly. Always triggering a leader election (and
thus increasing the leader epoch) on broker registration event may work,
but we have to think through the controller failover logic. When the
controller initializes, it simply reads all current broker registration
from ZK. The controller doesn't know whether any broker registration has
changed since the previous controller has failed. Just blindly forcing
leader election on all partitions during the controller failover probably
adds too much overhead.

So, the idea is to have the broker tracks the broker -> CZXID mapping in
memory. Every time the controller changes the leader for a partition, the
controller stores the CZXID of the leader together with the leader broker
id (and leader epoch, controller epoch etc) in memory and in
/brokers/topics/[topic]/partitions/[partitionId]/state
(this is missing in the KIP wiki). Now if the controller gets a broker
registration event or when there is a controller failover, the controller
just needs to force a leader election if the CZXID of the broker
registration doesn't match the CZXID associated with the leader in
/brokers/topics/[topic]/partitions/[partitionId]/state.
This way, we will only do leader election when it's truly necessary.

The reason why this change is related to this KIP is that it also addresses
the issue of keeping the replicas identical during correlated failures. If
all replicas are down and the leader replica is the first being restarted,
by forcing the increase of leader epoch even though the leader remains on
the same replica, we can distinguish the data written since the leader
replica is restarted from those written by the same leader replica before
it's restarted. This allows us to maintain all replicas to be identical
even in the correlated failure case.

Thanks,

Jun

On Sun, Dec 11, 2016 at 3:54 PM, Onur Karaman 
wrote:

> Pretty happy to see a KIP tackling this problem! One comment below.
>
> The "Extending LeaderEpoch to include Returning Leaders" states:
> "To protect against this eventuality the controller will maintain a cached
> mapping of [broker -> Zookeeper CZXID] (CZXID is a unique and monotonic
> 64-bit number) for the broker’s registration in Zookeeper
> (/brokers/ids/[brokerId]). If the controller receives a Broker Registration
> where the CZXID has changed it will increment the Leader Epoch and
> propagate that value to the broker via the Leader and ISR Request (in the
> normal way), then update the cached CZXID for that broker."
>
> In general I think kafka underutilizes zookeeper's various flavors of zxids
> but this time it's not clear to me what the motivation is for maintaining
> the broker to czxid mapping. It seems that the following check is
> redundant: "If the controller receives a Broker Registration where the
> CZXID has changed". By definition, the czxid of the /brokers/ids/[brokerId]
> znode will always change upon successful broker registration (
> https://zookeeper.apache.org/doc/r3.4.8/zookeeperProgrammers.html#sc_
> zkStatStructure).
> Why maintain the mapping at all? Why not just always update leader epochs
> and propagate every time the controller receives the broker registration zk
> event?
>
> On Sun, Dec 11, 2016 at 2:30 PM, Neha Narkhede  wrote:
>
> > Good to see this KIP being proposed. Back when I added the epoch to the
> > replication protocol, we discussed adding it to the log due to the
> failure
> > scenarios listed in the KIP but I failed to convince people that it was
> > worth the effort needed to upgrade the cluster (especially after we asked
> > people to go through a painful backwards incompatible upgrade for 0.8
> :-))
> > The lack of including the leader epoch/generation in the log has also
> been
> > one of the biggest critiques of Kafka's replication protocol by the
> > distributed systems community.
> >
> > I'm in favor of this work though I think we shouldn't end up with 2
> notions
> > of representing a leader's generation. When we added the epoch, we wanted
> > to add it to the log but we didn't. Now that we are adding the generation
> > id to the log, I think we should revisit calling it the epoch at all.
> Have
> > you thought about a way to evolve the epoch to the generation id
> throughout
> > and what it will take?
> >
> > On Sun, Dec 11, 2016 at 4:31 AM Ben Stopford  wrote:
> >
> > > Hi All
> > >
> > > Please find the below KIP which describes a proposed solution to a
> couple
> > > of issues that have been observed with the replication protocol.
> > >
> > > In short, the proposal replaces the use of the High Watermark, for
> > > follower log trunctation, with an alternate Generation Marker. This
> > > uniquely defines which leader messages were acknowledged by.
> > >
> > >
>

[jira] [Assigned] (KAFKA-4531) Rationalise client configuration validation

2016-12-14 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4531:
--

Assignee: Vahid Hashemian

> Rationalise client configuration validation 
> 
>
> Key: KAFKA-4531
> URL: https://issues.apache.org/jira/browse/KAFKA-4531
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Vahid Hashemian
>
> The broker-side configuration has a {{validateValues()}} method that could be 
> introduced also in the client-side {{ProducerConfig}} and {{ConsumerConfig}} 
> classes.
> The rationale is to centralise constraints between values, like e.g. this one 
> currently in the {{KafkaConsumer}} constructor:
> {code}
> if (this.requestTimeoutMs <= sessionTimeOutMs || 
> this.requestTimeoutMs <= fetchMaxWaitMs)
> throw new 
> ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be 
> greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + 
> ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
> {code}
> or custom validation of the provided values, e.g. this one in the 
> {{KafkaProducer}} :
> {code}
> private static int parseAcks(String acksString) {
> try {
> return acksString.trim().equalsIgnoreCase("all") ? -1 : 
> Integer.parseInt(acksString.trim());
> } catch (NumberFormatException e) {
> throw new ConfigException("Invalid configuration value for 
> 'acks': " + acksString);
> }
> }
> {code}
> also some new KIPs, e.g. KIP-81 propose constraints among different values,
> so it would be good not to scatter them around.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4531) Rationalise client configuration validation

2016-12-14 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749120#comment-15749120
 ] 

Vahid Hashemian commented on KAFKA-4531:


[~ecomar] It would be nice to centralize all config validations in one place as 
you suggest. Please feel free to re-assign the JIRA if you meant to work on it 
yourself. Thanks.

> Rationalise client configuration validation 
> 
>
> Key: KAFKA-4531
> URL: https://issues.apache.org/jira/browse/KAFKA-4531
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Edoardo Comar
>Assignee: Vahid Hashemian
>
> The broker-side configuration has a {{validateValues()}} method that could be 
> introduced also in the client-side {{ProducerConfig}} and {{ConsumerConfig}} 
> classes.
> The rationale is to centralise constraints between values, like e.g. this one 
> currently in the {{KafkaConsumer}} constructor:
> {code}
> if (this.requestTimeoutMs <= sessionTimeOutMs || 
> this.requestTimeoutMs <= fetchMaxWaitMs)
> throw new 
> ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be 
> greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + 
> ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
> {code}
> or custom validation of the provided values, e.g. this one in the 
> {{KafkaProducer}} :
> {code}
> private static int parseAcks(String acksString) {
> try {
> return acksString.trim().equalsIgnoreCase("all") ? -1 : 
> Integer.parseInt(acksString.trim());
> } catch (NumberFormatException e) {
> throw new ConfigException("Invalid configuration value for 
> 'acks': " + acksString);
> }
> }
> {code}
> also some new KIPs, e.g. KIP-81 propose constraints among different values,
> so it would be good not to scatter them around.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: kafka_2.11-0.9.0.1 crash with java coredump

2016-12-14 Thread Apurva Mehta
I would suggest creating a JIRA and describing in detail what was going on
in the cluster when this happened, and posting the associated broker /
state change / controller logs.

Thanks,
Apurva

On Wed, Dec 14, 2016 at 3:28 AM, Mazhar Shaikh 
wrote:

> Hi All,
>
> I am using kafka_2.11-0.9.0.1 with java version "1.7.0_51".
>
> On random days kafka process stops (crashes) with a java coredump file as
> below.
>
> (gdb) bt
> #0 0x7f33059f70d5 in raise () from /lib/x86_64-linux-gnu/libc.so.6
> #1 0x7f33059fa83b in abort () from /lib/x86_64-linux-gnu/libc.so.6
> #2 0x7f33049ae405 in os::abort(bool) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #3 0x7f3304b2d347 in VMError::report_and_die() () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #4 0x7f3304b2d8de in crash_handler(int, siginfo*, void*) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #5 
> #6 0x7f33046b92f5 in
> G1BlockOffsetArray::forward_to_block_containing_addr_slow(HeapWord*,
> HeapWord*, void const*) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #7 0x7f33049a60f0 in os::print_location(outputStream*, long, bool) ()
> from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #8 0x7f33049b2678 in os::print_register_info(outputStream*, void*) ()
> from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #9 0x7f3304b2b94b in VMError::report(outputStream*) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #10 0x7f3304b2cf4a in VMError::report_and_die() () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #11 0x7f33049b2d8f in JVM_handle_linux_signal () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #12 
> #13 0x7f32ffbc64bf in ?? ()
> #14 0xca57b708 in ?? ()
> #15 0x7f32fae97928 in ?? ()
> #16 0xbf2f05e8 in ?? ()
> #17 0x in ?? ()
> #18 0xc3b27610 in ?? ()
> #19 0xbed92898 in ?? ()
> #20 0xe269aac8 in ?? ()
> #21 0x in ?? ()
>
>
> Can anyone suggest a solution to overcome this issue.
>
> Thank you.
>
> Mazhar Shaikh.
>


Re: kafka_2.11-0.9.0.1 crash with java coredump

2016-12-14 Thread Ismael Juma
Hi Mazhar,

That looks like a G1 GC bug. Lots of bug in G1 have been fixed since JDK 7
update 51. I highly recommend that you upgrade to a more recent JDK (JDK 8
recommended). Alternatively you could try the CMS GC.

Ismael

On Wed, Dec 14, 2016 at 3:28 AM, Mazhar Shaikh 
wrote:

> Hi All,
>
> I am using kafka_2.11-0.9.0.1 with java version "1.7.0_51".
>
> On random days kafka process stops (crashes) with a java coredump file as
> below.
>
> (gdb) bt
> #0 0x7f33059f70d5 in raise () from /lib/x86_64-linux-gnu/libc.so.6
> #1 0x7f33059fa83b in abort () from /lib/x86_64-linux-gnu/libc.so.6
> #2 0x7f33049ae405 in os::abort(bool) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #3 0x7f3304b2d347 in VMError::report_and_die() () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #4 0x7f3304b2d8de in crash_handler(int, siginfo*, void*) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #5 
> #6 0x7f33046b92f5 in
> G1BlockOffsetArray::forward_to_block_containing_addr_slow(HeapWord*,
> HeapWord*, void const*) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #7 0x7f33049a60f0 in os::print_location(outputStream*, long, bool) ()
> from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #8 0x7f33049b2678 in os::print_register_info(outputStream*, void*) ()
> from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #9 0x7f3304b2b94b in VMError::report(outputStream*) () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #10 0x7f3304b2cf4a in VMError::report_and_die() () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #11 0x7f33049b2d8f in JVM_handle_linux_signal () from
> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
> #12 
> #13 0x7f32ffbc64bf in ?? ()
> #14 0xca57b708 in ?? ()
> #15 0x7f32fae97928 in ?? ()
> #16 0xbf2f05e8 in ?? ()
> #17 0x in ?? ()
> #18 0xc3b27610 in ?? ()
> #19 0xbed92898 in ?? ()
> #20 0xe269aac8 in ?? ()
> #21 0x in ?? ()
>
>
> Can anyone suggest a solution to overcome this issue.
>
> Thank you.
>
> Mazhar Shaikh.
>


[jira] [Commented] (KAFKA-4431) HeartbeatThread should be a daemon thread

2016-12-14 Thread David Judd (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749174#comment-15749174
 ] 

David Judd commented on KAFKA-4431:
---

Awesome, thanks!

> HeartbeatThread should be a daemon thread
> -
>
> Key: KAFKA-4431
> URL: https://issues.apache.org/jira/browse/KAFKA-4431
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: David Judd
>Assignee: Rajini Sivaram
> Fix For: 0.10.1.1, 0.10.2.0
>
>
> We're seeing an issue where an exception inside the main processing loop of a 
> consumer doesn't cause the JVM to exit, as expected (and, in our case, 
> desired). From the thread dump, it appears that what's blocking exit is the 
> "kafka-coordinator-heartbeat-thread", which is not currently a daemon thread. 
> Per the mailing list, it sounds like this is a bug.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-14 Thread Apurva Mehta
Hi Ben,

You are are right on both counts:

Writing apps to do consume-process-produce batching will be tricky to
program using this API directly. The expectation is that 99% of the users
would use the streams API to leverage this functionality, and that API will
take care of the details. This seems fair, since this pattern is at the
core of stream processing.

Using an internally generated PID is definitely more a performance than a
correctness thing: we could generate UUIDs in the producer if no AppId is
specified, and that would also work. However, as you may have seen
elsewhere in the thread, there are calls for the PID to be 4 bytes (vs. the
present 8 bytes). So 16 bytes will be really far out. While the cost of a
PID is amortized across the messages in a message set, we still want to
keep it as small as possible to reduce the overhead.

We are thinking about how to recover expired PIDs which would enable us to
keep it to just 4 bytes (enough to handle 4billion concurrently alive
producers). However, this will be very very tricky because a given PID
could produce to multiple topic partitions, and recovering a PID will
require _all_ instances of it across _all_ topic partitions to be expired.
This would be very hard to achieve without invasive things like reference
counting. Still we are searching for a more elegant and lightweight
solution to the problem, and will use 4 byte PID if we can find an
acceptable solution for PID recovery.

Thanks,
Apurva



On Tue, Dec 13, 2016 at 9:30 PM, Ben Kirwin  wrote:

> Hi Apurva,
>
> Thanks for the detailed answers... and sorry for the late reply!
>
> It does sound like, if the input-partitions-to-app-id mapping never
> changes, the existing fencing mechanisms should prevent duplicates. Great!
> I'm a bit concerned the proposed API will be delicate to program against
> successfully -- even in the simple case, we need to create a new producer
> instance per input partition, and anything fancier is going to need its own
> implementation of the Streams/Samza-style 'task' idea -- but that may be
> fine for this sort of advanced feature.
>
> For the second question, I notice that Jason also elaborated on this
> downthread:
>
> > We also looked at removing the producer ID.
> > This was discussed somewhere above, but basically the idea is to store
> the
> > AppID in the message set header directly and avoid the mapping to
> producer
> > ID altogether. As long as batching isn't too bad, the impact on total
> size
> > may not be too bad, but we were ultimately more comfortable with a fixed
> > size ID.
>
> ...which suggests that the distinction is useful for performance, but not
> necessary for correctness, which makes good sense to me. (Would a 128-bid
> ID be a reasonable compromise? That's enough room for a UUID, or a
> reasonable hash of an arbitrary string, and has only a marginal increase on
> the message size.)
>
> On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta  wrote:
>
> > Hi Ben,
> >
> > Now, on to your first question of how deal with consumer rebalances. The
> > short answer is that the application needs to ensure that the the
> > assignment of input partitions to appId is consistent across rebalances.
> >
> > For Kafka streams, they already ensure that the mapping of input
> partitions
> > to task Id is invariant across rebalances by implementing a custom sticky
> > assignor. Other non-streams apps can trivially have one producer per
> input
> > partition and have the appId be the same as the partition number to
> achieve
> > the same effect.
> >
> > With this precondition in place, we can maintain transactions across
> > rebalances.
> >
> > Hope this answers your question.
> >
> > Thanks,
> > Apurva
> >
> > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin  wrote:
> >
> > > Thanks for this! I'm looking forward to going through the full proposal
> > in
> > > detail soon; a few early questions:
> > >
> > > First: what happens when a consumer rebalances in the middle of a
> > > transaction? The full documentation suggests that such a transaction
> > ought
> > > to be rejected:
> > >
> > > > [...] if a rebalance has happened and this consumer
> > > > instance becomes a zombie, even if this offset message is appended in
> > the
> > > > offset topic, the transaction will be rejected later on when it tries
> > to
> > > > commit the transaction via the EndTxnRequest.
> > >
> > > ...but it's unclear to me how we ensure that a transaction can't
> complete
> > > if a rebalance has happened. (It's quite possible I'm missing something
> > > obvious!)
> > >
> > > As a concrete example: suppose a process with PID 1 adds offsets for
> some
> > > partition to a transaction; a consumer rebalance happens that assigns
> the
> > > partition to a process with PID 2, which adds some offsets to its
> current
> > > transaction; both processes try and commit. Allowing both commits would
> > > cause the messages to be processed twice -- how is that avoided?
> > >
> > > Second: App IDs normally 

Jenkins build is back to normal : kafka-0.10.1-jdk7 #108

2016-12-14 Thread Apache Jenkins Server
See 



Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-14 Thread Apurva Mehta
Hi Rajini,

I think my original response to your point 15 was not accurate. The regular
definition of durability is that data once committed would never be lost.
So it is not enough for only the control messages to be flushed before
being acknowledged -- all the messages (and offset commits) which are part
of the transaction would need to be flushed before being acknowledged as
well.

Otherwise, it is possible that if all replicas of a topic partition crash
before the transactional messages are flushed, those messages will be lost
even if the commit marker exists in the log. In this case, the transaction
would be 'committed' with incomplete data.

Right now, there isn't any config which will ensure that the flush to disk
happens before the acknowledgement. We could add it in the future, and get
durability guarantees for kafka transactions.

I hope this clarifies the situation. The present KIP does not intend to add
the aforementioned config, so even the control messages are susceptible to
being lost if there is a simultaneous crash across all replicas. So
transactions are only as durable as existing Kafka messages. We don't
strengthen any durability guarantees as part of this KIP.

Thanks,
Apurva


On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram  wrote:

> Hi Apurva,
>
> Thank you for the answers. Just one follow-on.
>
> 15. Let me rephrase my original question. If all control messages (messages
> to transaction logs and markers on user logs) were acknowledged only after
> flushing the log segment, will transactions become durable in the
> traditional sense (i.e. not restricted to min.insync.replicas failures) ?
> This is not a suggestion to update the KIP. It seems to me that the design
> enables full durability if required in the future with a rather
> non-intrusive change. I just wanted to make sure I haven't missed anything
> fundamental that prevents Kafka from doing this.
>
>
>
> On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin  wrote:
>
> > Hi Apurva,
> >
> > Thanks for the detailed answers... and sorry for the late reply!
> >
> > It does sound like, if the input-partitions-to-app-id mapping never
> > changes, the existing fencing mechanisms should prevent duplicates.
> Great!
> > I'm a bit concerned the proposed API will be delicate to program against
> > successfully -- even in the simple case, we need to create a new producer
> > instance per input partition, and anything fancier is going to need its
> own
> > implementation of the Streams/Samza-style 'task' idea -- but that may be
> > fine for this sort of advanced feature.
> >
> > For the second question, I notice that Jason also elaborated on this
> > downthread:
> >
> > > We also looked at removing the producer ID.
> > > This was discussed somewhere above, but basically the idea is to store
> > the
> > > AppID in the message set header directly and avoid the mapping to
> > producer
> > > ID altogether. As long as batching isn't too bad, the impact on total
> > size
> > > may not be too bad, but we were ultimately more comfortable with a
> fixed
> > > size ID.
> >
> > ...which suggests that the distinction is useful for performance, but not
> > necessary for correctness, which makes good sense to me. (Would a 128-bid
> > ID be a reasonable compromise? That's enough room for a UUID, or a
> > reasonable hash of an arbitrary string, and has only a marginal increase
> on
> > the message size.)
> >
> > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta 
> wrote:
> >
> > > Hi Ben,
> > >
> > > Now, on to your first question of how deal with consumer rebalances.
> The
> > > short answer is that the application needs to ensure that the the
> > > assignment of input partitions to appId is consistent across
> rebalances.
> > >
> > > For Kafka streams, they already ensure that the mapping of input
> > partitions
> > > to task Id is invariant across rebalances by implementing a custom
> sticky
> > > assignor. Other non-streams apps can trivially have one producer per
> > input
> > > partition and have the appId be the same as the partition number to
> > achieve
> > > the same effect.
> > >
> > > With this precondition in place, we can maintain transactions across
> > > rebalances.
> > >
> > > Hope this answers your question.
> > >
> > > Thanks,
> > > Apurva
> > >
> > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin  wrote:
> > >
> > > > Thanks for this! I'm looking forward to going through the full
> proposal
> > > in
> > > > detail soon; a few early questions:
> > > >
> > > > First: what happens when a consumer rebalances in the middle of a
> > > > transaction? The full documentation suggests that such a transaction
> > > ought
> > > > to be rejected:
> > > >
> > > > > [...] if a rebalance has happened and this consumer
> > > > > instance becomes a zombie, even if this offset message is appended
> in
> > > the
> > > > > offset topic, the transaction will be rejected later on when it
> tries
> > > to
> > > > > commit the transaction via the EndTxnRequest.
> > > >
> > > 

[DISCUSS] Control Messages - [Was: KIP-82 - Add Record Headers]

2016-12-14 Thread Ignacio Solis
I'm renaming this thread in case we start deep diving.

I'm in favor of so called "control messages", at least the notion of
those.  However, I'm not sure about the design.

What I understood from the original mail:

A. Provide a message that does not get returned by poll()
B. Provide a way for applications to consume these messages (sign up?)
C. Control messages would be associated with a topic.
D. Control messages should be _in_ the topic.



1. The first thing to point out is that this can be done with headers.
I assume that's why you sent it on the header thread. As you state, if
we had headers, you would not require a separate KIP.  So, in a way,
you're trying to provide a concrete use case for headers.  I wanted to
separate the discussion to a separate thread mostly because while I
like the idea, and I like the fact that it can be done by headers,
people might want to discuss alternatives.

2. I'm also assuming that you're intentionally trying to preserve
order. Headers could do this natively of course. You could also
achieve this with the separate topic given identifiers, sequence
numbers, headers, etc.  However...

3. There are a few use cases where ordering is important but
out-of-band is even more important. We have a few large workloads
where this is of interest to us.  Obviously we can achieve this with a
separate topic, but having a control channel for a topic that can send
high priority data would be interesting.   And yes, we would learn a
lot form the TCP experiences with the urgent pointer (
https://tools.ietf.org/html/rfc6093 ) and other out-of-band
communication techniques.

You have an example of a "shutdown marker".  This works ok as a
terminator, however, it is not very fast.  If I have 4 TB of data
because of asynchronous processing, then a shutdown marker at the end
of the 4TB is not as useful as having an out-of-band message that will
tell me immediately that those 4TB should not be processed.   So, from
this perspective, I prefer to have a separate topic and not embed
control messages with the data.

If the messages are part of the data, or associated to specific data,
then they should be in the data. If they are about process, we need an
out-of-band mechanism.


4. The general feeling I have gotten from a few people on the list is:
Why not just do this above the kafka clients?  After all, you could
have a system to ignore certain schemas.

Effectively, if we had headers, it would be done from a client
perspective, without the need to modify anything major.

If we wanted to do it with a separate topic, that could also be done
without any broker changes. But you could imagine wanting some broker
changes if the broker understands that 2 streams are tied together
then it may make decisions based on that.  This would be similar to
the handling of file system forks (
https://en.wikipedia.org/wiki/Fork_(file_system) )


5. Also heard on discussions about headers: we don't know if this is
generally useful. Maybe only a couple of institutions?  It may not be
worth it to modify the whole stack for that.

I would again say that with headers you could pull it off easily, even
if only for a subset of clients/applications wanted to use it.


So, in summary. I like the idea.  I see benefits in implementing it
through headers, but I also see benefits of having it as a separate
stream.  I'm not too in favor of having a separate message handling
pipeline for the same topic though.

Nacho





On Wed, Dec 14, 2016 at 9:51 AM, Matthias J. Sax  wrote:
> Yes and no. I did overload the term "control message".
>
> EOS control messages are for client-broker communication and thus never
> exposed to any application. And I think this is a good design because
> broker needs to understand those control messages. Thus, this should be
> a protocol change.
>
> The type of control messages I have in mind are for client-client
> (application-application) communication and the broker is agnostic to
> them. Thus, it should not be a protocol change.
>
>
> -Matthias
>
>
>
> On 12/14/16 9:42 AM, radai wrote:
>> arent control messages getting pushed as their own top level protocol
>> change (and a fairly massive one) for the transactions KIP ?
>>
>> On Tue, Dec 13, 2016 at 5:54 PM, Matthias J. Sax 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to add a completely new angle to this discussion. For this, I
>>> want to propose an extension for the headers feature that enables new
>>> uses cases -- and those new use cases might convince people to support
>>> headers (of course including the larger scoped proposal).
>>>
>>> Extended Proposal:
>>>
>>> Allow messages with a certain header key to be special "control
>>> messages" (w/ o w/o payload) that are not exposed to an application via
>>> .poll().
>>>
>>> Thus, a consumer client would automatically skip over those messages. If
>>> an application knows about embedded control messages, it can "sing up"
>>> to those messages by the consumer client and either get a callback or
>>> the

Re: kafka_2.11-0.9.0.1 crash with java coredump

2016-12-14 Thread Schumann,Robert
I would suggest trying a recent java version first, if I read about this one: 
http://mail.openjdk.java.net/pipermail/hotspot-gc-dev/2014-December/011534.html

Cheers
Robert

--
Robert Schumann | Lead DevOps Engineer | mobile.de GmbH
T: + 49. 30. 8109. 7219
M: +49.151. 5504. 8246
F: +49. 30. 8109. 7131
roschum...@ebay.com
www.mobile.de
 
Marktplatz 1 | 14532 Europarc Dreilinden | Germany
 
Geschaeftsfuehrer: Malte Krueger
HRB Nr.: 18517 P,  AG Potsdam
Sitz der Gesellschaft: Kleinmachnow
 

On 14/12/2016, 19:50, "Apurva Mehta"  wrote:

>I would suggest creating a JIRA and describing in detail what was going on
>in the cluster when this happened, and posting the associated broker /
>state change / controller logs.
>
>Thanks,
>Apurva
>
>On Wed, Dec 14, 2016 at 3:28 AM, Mazhar Shaikh 
>wrote:
>
>> Hi All,
>>
>> I am using kafka_2.11-0.9.0.1 with java version "1.7.0_51".
>>
>> On random days kafka process stops (crashes) with a java coredump file as
>> below.
>>
>> (gdb) bt
>> #0 0x7f33059f70d5 in raise () from /lib/x86_64-linux-gnu/libc.so.6
>> #1 0x7f33059fa83b in abort () from /lib/x86_64-linux-gnu/libc.so.6
>> #2 0x7f33049ae405 in os::abort(bool) () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #3 0x7f3304b2d347 in VMError::report_and_die() () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #4 0x7f3304b2d8de in crash_handler(int, siginfo*, void*) () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #5 
>> #6 0x7f33046b92f5 in
>> G1BlockOffsetArray::forward_to_block_containing_addr_slow(HeapWord*,
>> HeapWord*, void const*) () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #7 0x7f33049a60f0 in os::print_location(outputStream*, long, bool) ()
>> from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #8 0x7f33049b2678 in os::print_register_info(outputStream*, void*) ()
>> from /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #9 0x7f3304b2b94b in VMError::report(outputStream*) () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #10 0x7f3304b2cf4a in VMError::report_and_die() () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #11 0x7f33049b2d8f in JVM_handle_linux_signal () from
>> /opt/jdk1.7.0_51/jre/lib/amd64/server/libjvm.so
>> #12 
>> #13 0x7f32ffbc64bf in ?? ()
>> #14 0xca57b708 in ?? ()
>> #15 0x7f32fae97928 in ?? ()
>> #16 0xbf2f05e8 in ?? ()
>> #17 0x in ?? ()
>> #18 0xc3b27610 in ?? ()
>> #19 0xbed92898 in ?? ()
>> #20 0xe269aac8 in ?? ()
>> #21 0x in ?? ()
>>
>>
>> Can anyone suggest a solution to overcome this issue.
>>
>> Thank you.
>>
>> Mazhar Shaikh.
>>



Brokers cashing with OOME Map failed

2016-12-14 Thread Zakee
Recently, we have seen our brokers crash with below errors, any idea what might 
be wrong here?  The brokers have been running for long with the same 
hosts/configs without this issue before. Is this something to do with new 
version 0.10.0.1 (which we upgraded recently) or could it be a h/w issue?  10 
hosts are dedicated for one broker per host. Each host has 128 gb RAM and 20TB 
of storage mounts. Any pointers will help...


[2016-12-12 02:49:58,134] FATAL [app=broker] [ReplicaFetcherThread-15-15] 
[ReplicaFetcherThread-15-15], Disk error while replicating data for mytopic-19 
(kafka.server.ReplicaFetcherThread)
kafka.common.KafkaStorageException: I/O exception in append to log ’ mytopic-19'
at kafka.log.Log.append(Log.scala:349)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:159)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
at scala.Option.foreach(Option.scala:257)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:907)
at 
kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
at 
kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
at 
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
at 
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
at 
kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
at kafka.log.Log.roll(Log.scala:772)
at kafka.log.Log.maybeRoll(Log.scala:742)
at kafka.log.Log.append(Log.scala:405)
... 16 more
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:904)
... 28 more


Thanks
-Zakee

[GitHub] kafka pull request #2257: MINOR: Reenable streams smoke test

2016-12-14 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2257

MINOR: Reenable streams smoke test

I ran it 3 times and it works again.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka minor-reenable-smoke-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2257


commit ec77685d867582efde264042b7a4323b7bcaaaec
Author: Eno Thereska 
Date:   2016-12-14T20:49:49Z

Reenable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4540) Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created

2016-12-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749465#comment-15749465
 ] 

Guozhang Wang commented on KAFKA-4540:
--

Is this already resolved as part of 
https://issues.apache.org/jira/browse/KAFKA-4509? In the patch we reorder the 
operations to first close all the suspended tasks and then try to create new 
tasks.

> Suspended tasks that are not assigned to the StreamThread need to be closed 
> before new active and standby tasks are created
> ---
>
> Key: KAFKA-4540
> URL: https://issues.apache.org/jira/browse/KAFKA-4540
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> When partition assignment happens we first try and add the active tasks and 
> then add the standby tasks. The problem with this is that a new active task 
> might already be an existing suspended standby task. if this is the case then 
> when the active task initialises it will throw an exception from RocksDB:
> {{Caused by: org.rocksdb.RocksDBException: IO error: lock 
> /tmp/kafka-streams-7071/kafka-music-charts/1_1/rocksdb/all-songs/LOCK: No 
> locks available}}
> We need to make sure we have removed an closed any no-longer assigned 
> Suspended tasks before creating new tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2257: MINOR: Reenable streams smoke test

2016-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2257


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4528) Failure in kafka.producer.ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout

2016-12-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749479#comment-15749479
 ] 

Guozhang Wang commented on KAFKA-4528:
--

Another observed failure: 
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/139/console

> Failure in 
> kafka.producer.ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout
> ---
>
> Key: KAFKA-4528
> URL: https://issues.apache.org/jira/browse/KAFKA-4528
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>  Labels: newbie
>
> I have seen this failure a few times in the past few days, worth 
> investigating.
> Example:
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/79/testReport/junit/kafka.producer/ProducerTest/testAsyncSendCanCorrectlyFailWithTimeout/
> {code}
> Stacktrace
> java.lang.AssertionError: Message set should have 1 message
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.producer.ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout(ProducerTest.scala:313)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>   at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run

[jira] [Work stopped] (KAFKA-4437) Incremental Batch Processing for Kafka Streams

2016-12-14 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4437 stopped by Matthias J. Sax.
--
> Incremental Batch Processing for Kafka Streams
> --
>
> Key: KAFKA-4437
> URL: https://issues.apache.org/jira/browse/KAFKA-4437
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> We want to add an “auto stop” feature that terminate a stream application 
> when it has processed all the data that was newly available at the time the 
> application started (to at current end-of-log, i.e., current high watermark). 
> This allows to chop the (infinite) log into finite chunks where each run for 
> the application processes one chunk. This feature allows for incremental 
> batch-like processing; think "start-process-stop-restart-process-stop-..."
> For details see KIP-95: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4540) Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created

2016-12-14 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749549#comment-15749549
 ] 

Damian Guy commented on KAFKA-4540:
---

This issue isn't resolved. The problem here is that say i previously had 
standby task 1_0, then onPartitionsRevoked it gets suspended. Then 
onPartitionsAssigned it becomes an active task. When we try to create the 
Active Task for 1_0 we still have a Standby Task "open" for the same task and 
we get the exception above.

> Suspended tasks that are not assigned to the StreamThread need to be closed 
> before new active and standby tasks are created
> ---
>
> Key: KAFKA-4540
> URL: https://issues.apache.org/jira/browse/KAFKA-4540
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> When partition assignment happens we first try and add the active tasks and 
> then add the standby tasks. The problem with this is that a new active task 
> might already be an existing suspended standby task. if this is the case then 
> when the active task initialises it will throw an exception from RocksDB:
> {{Caused by: org.rocksdb.RocksDBException: IO error: lock 
> /tmp/kafka-streams-7071/kafka-music-charts/1_1/rocksdb/all-songs/LOCK: No 
> locks available}}
> We need to make sure we have removed an closed any no-longer assigned 
> Suspended tasks before creating new tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-66 Kafka Connect Transformers for messages

2016-12-14 Thread Shikhar Bhushan
With regard to a), just using `ConnectRecord` with `newRecord` as a new
abstract method would be a fine choice. In prototyping, both options end up
looking pretty similar (in terms of how transformations are implemented and
the runtime initializes and uses them) and I'm starting to lean towards not
adding a new interface into the mix.

On b) I think we should include a small set of useful transformations with
Connect, since they can be applicable across different connectors and we
should encourage some standardization for common operations. I'll update
KIP-66 soon including a spec of transformations that I believe are worth
including.

On Sat, Dec 10, 2016 at 11:52 PM Ewen Cheslack-Postava 
wrote:

If anyone has time to review here, it'd be great to get feedback. I'd
imagine that the proposal itself won't be too controversial -- keeps
transformations simple (by only allowing map/filter), doesn't affect the
rest of the framework much, and fits in with general config structure we've
used elsewhere (although ConfigDef could use some updates to make this
easier...).

I think the main open questions for me are:

a) Is TransformableRecord worth it to avoid reimplementing small bits of
code (it allows for a single implementation of the interface to trivially
apply to both Source and SinkRecords). I think I prefer this, but it does
come with some commitment to another interface on top of ConnectRecord. We
could alternatively modify ConnectRecord which would require fewer changes.
b) How do folks feel about built-in transformations and the set that are
mentioned here? This brings us way back to the discussion of built-in
connectors. Transformations, especially when intended to be lightweight and
touch nothing besides the data already in the record, seem different from
connectors -- there might be quite a few, but hopefully limited. Since we
(hopefully) already factor out most serialization-specific stuff via
Converters, I think we can keep this pretty limited. That said, I have no
doubt some folks will (in my opinion) abuse this feature to do data
enrichment by querying external systems, so building a bunch of
transformations in could potentially open the floodgates, or at least make
decisions about what is included vs what should be 3rd party muddy.

-Ewen


On Wed, Dec 7, 2016 at 11:46 AM, Shikhar Bhushan 
wrote:

> Hi all,
>
> I have another iteration at a proposal for this feature here:
> https://cwiki.apache.org/confluence/display/KAFKA/
> Connect+Transforms+-+Proposed+Design
>
> I'd welcome your feedback and comments.
>
> Thanks,
>
> Shikhar
>
> On Tue, Aug 2, 2016 at 7:21 PM Ewen Cheslack-Postava 
> wrote:
>
> On Thu, Jul 28, 2016 at 11:58 PM, Shikhar Bhushan 
> wrote:
>
> > >
> > >
> > > Hmm, operating on ConnectRecords probably doesn't work since you need
> to
> > > emit the right type of record, which might mean instantiating a new
> one.
> > I
> > > think that means we either need 2 methods, one for SourceRecord, one
> for
> > > SinkRecord, or we'd need to limit what parts of the message you can
> > modify
> > > (e.g. you can change the key/value via something like
> > > transformKey(ConnectRecord) and transformValue(ConnectRecord), but
> other
> > > fields would remain the same and the fmwk would handle allocating new
> > > Source/SinkRecords if needed)
> > >
> >
> > Good point, perhaps we could add an abstract method on ConnectRecord
that
> > takes all the shared fields as parameters and the implementations return
> a
> > copy of the narrower SourceRecord/SinkRecord type as appropriate.
> > Transformers would only operate on ConnectRecord rather than caring
about
> > SourceRecord or SinkRecord (in theory they could instanceof/cast, but
the
> > API should discourage it)
> >
> >
> > > Is there a use case for hanging on to the original? I can't think of a
> > > transformation where you'd need to do that (or couldn't just order
> things
> > > differently so it isn't a problem).
> >
> >
> > Yeah maybe this isn't really necessary. No strong preference here.
> >
> > That said, I do worry a bit that farming too much stuff out to
> transformers
> > > can result in "programming via config", i.e. a lot of the simplicity
> you
> > > get from Connect disappears in long config files. Standardization
would
> > be
> > > nice and might just avoid this (and doesn't cost that much
implementing
> > it
> > > in each connector), and I'd personally prefer something a bit less
> > flexible
> > > but consistent and easy to configure.
> >
> >
> > Not sure what the you're suggesting :-) Standardized config properties
> for
> > a small set of transformations, leaving it upto connectors to integrate?
> >
>
> I just mean that you get to the point where you're practically writing a
> Kafka Streams application, you're just doing it through either an
> incredibly convoluted set of transformers and configs, or a single
> transformer with incredibly convoluted set of configs. You basically get
to
> the point where you're config is a min

[jira] [Commented] (KAFKA-1521) Producer Graceful Shutdown issue in Container (Kafka version 0.8.x.x)

2016-12-14 Thread Anish Khanzode (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749639#comment-15749639
 ] 

Anish Khanzode commented on KAFKA-1521:
---

This is really a problem when kafka consumer is used in a embedded environment. 
I would love to have API that can let me pass pluggable metrics system or 
ignore if dont care.
Does new API also suffer from same issue? 

> Producer Graceful Shutdown issue in Container (Kafka version 0.8.x.x)
> -
>
> Key: KAFKA-1521
> URL: https://issues.apache.org/jira/browse/KAFKA-1521
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.0, 0.8.1.1
> Environment: Tomcat Container or Any other J2EE container
>Reporter: Bhavesh Mistry
>Assignee: Jun Rao
>Priority: Minor
>
> Hi Kafka Team,
> We are running multiple webapps in tomcat container, and we have producer 
> which are managed by the ServletContextListener (Lifecycle).  Upon  
> contextInitialized we create and on contextDestroyed we call the 
> producer.close() but underlying Metric Lib does not shutdown.  So we have 
> thread leak due to this issue.  I had to call 
> Metrics.defaultRegistry().shutdown() to resolve this issue.  is this know 
> issue ? I know the metric lib have JVM Shutdown hook, but it will not be 
> invoke since the contain thread is un-deploying the web app and class loader 
> goes way and leaking thread does not find the under lying Kafka class.
> Because of this tomcat, it not shutting down gracefully.
> Are you guys planing to un-register metrics when Producer close is called or 
> shutdown Metrics pool for client.id ? 
> Here is logs:
> SEVERE: The web application [  ] appears to have started a thread named 
> [metrics-meter-tick-thread-1] but has failed to stop it. This is very likely 
> to create a memory leak.
> SEVERE: The web application [] appears to have started a thread named 
> [metrics-meter-tick-thread-2] but has failed to stop it. This is very likely 
> to create a memory leak.
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4540) Suspended tasks that are not assigned to the StreamThread need to be closed before new active and standby tasks are created

2016-12-14 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749636#comment-15749636
 ] 

Matthias J. Sax commented on KAFKA-4540:


This makes sense. The problem is that 
{{ConsumerRebalanceListener#onPartitionsAssigned()}} first calls 
{{addStreamTasks()}} and afterwards calls {{addStandbyTasks()}}.

> Suspended tasks that are not assigned to the StreamThread need to be closed 
> before new active and standby tasks are created
> ---
>
> Key: KAFKA-4540
> URL: https://issues.apache.org/jira/browse/KAFKA-4540
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.2.0
>
>
> When partition assignment happens we first try and add the active tasks and 
> then add the standby tasks. The problem with this is that a new active task 
> might already be an existing suspended standby task. if this is the case then 
> when the active task initialises it will throw an exception from RocksDB:
> {{Caused by: org.rocksdb.RocksDBException: IO error: lock 
> /tmp/kafka-streams-7071/kafka-music-charts/1_1/rocksdb/all-songs/LOCK: No 
> locks available}}
> We need to make sure we have removed an closed any no-longer assigned 
> Suspended tasks before creating new tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2016-12-14 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749722#comment-15749722
 ] 

Apurva Mehta commented on KAFKA-4477:
-

Hi [~tdevoe] (and other as well), 

Would it be possible to enable trace logging for kafka requsets in your 
production cluster? 

To do this you would have to set `log4j.logger.kafka.request.logger=TRACE, 
requestAppender` in you log4j.properties file. 

This would enable us to get the information of when the brokers sent responses 
to particular requests. From the data you have shared, it appears that the 
broker 1002 process the fetch requests from replicas part way, and then somehow 
it gets lost. With the trace logging, we would be able to isolate the problem 
to either the server or the client, and then redouble our focus. 

If you do enable trace logging for requests, then the output will be in the  
`kafka-request.log` file in your regular log directory. 

Thanks,
Apurva

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce (IG)
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Attachments: issue_node_1001.log, issue_node_1001_ext.log, 
> issue_node_1002.log, issue_node_1002_ext.log, issue_node_1003.log, 
> issue_node_1003_ext.log, kafka.jstack, state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1696) Kafka should be able to generate Hadoop delegation tokens

2016-12-14 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-1696:
--
Issue Type: New Feature  (was: Sub-task)
Parent: (was: KAFKA-1682)

> Kafka should be able to generate Hadoop delegation tokens
> -
>
> Key: KAFKA-1696
> URL: https://issues.apache.org/jira/browse/KAFKA-1696
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
>
> For access from MapReduce/etc jobs run on behalf of a user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2016-12-14 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749750#comment-15749750
 ] 

Ismael Juma commented on KAFKA-4477:


One clarification: if you use the `log4j.properties` file that is shipped with 
Kafka, you should edit the existing line instead of adding a new one:

log4j.logger.kafka.request.logger=WARN, requestAppender

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce (IG)
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Attachments: issue_node_1001.log, issue_node_1001_ext.log, 
> issue_node_1002.log, issue_node_1002_ext.log, issue_node_1003.log, 
> issue_node_1003_ext.log, kafka.jstack, state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4541) Add capability to create delegation token

2016-12-14 Thread Ashish K Singh (JIRA)
Ashish K Singh created KAFKA-4541:
-

 Summary: Add capability to create delegation token
 Key: KAFKA-4541
 URL: https://issues.apache.org/jira/browse/KAFKA-4541
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ashish K Singh


Add request/ response and server side handling to create delegation tokens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #1103

2016-12-14 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: Reenable streams smoke test

--
[...truncated 17361 lines...]
org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup PASSED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose STARTED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] FAILED
java.lang.AssertionError: Condition not met within timeout 3. waiting 
for store count-by-key
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:283)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable(QueryableStateIntegrationTest.java:491)

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] FAILED
java.lang.AssertionError: Condition not met within timeout 6. Did not 
receive 1 number of records
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:283)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:251)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.waitUntilAtLeastNumRecordProcessed(QueryableStateIntegrationTest.java:658)
at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.queryOnRebalance(QueryableStateIntegrationTest.java:341)

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[1] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[1] PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMult

[jira] [Created] (KAFKA-4544) Add capability to renew/expire delegation tokens.

2016-12-14 Thread Ashish K Singh (JIRA)
Ashish K Singh created KAFKA-4544:
-

 Summary: Add capability to renew/expire delegation tokens.
 Key: KAFKA-4544
 URL: https://issues.apache.org/jira/browse/KAFKA-4544
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ashish K Singh


Add capability to renew/expire delegation tokens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4543) Add capability to renew/expire delegation tokens.

2016-12-14 Thread Ashish K Singh (JIRA)
Ashish K Singh created KAFKA-4543:
-

 Summary: Add capability to renew/expire delegation tokens.
 Key: KAFKA-4543
 URL: https://issues.apache.org/jira/browse/KAFKA-4543
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ashish K Singh


Add capability to renew/expire delegation tokens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4544) Add system tests for delegation token based authentication

2016-12-14 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-4544:
--
Summary: Add system tests for delegation token based authentication  (was: 
Add capability to renew/expire delegation tokens.)

> Add system tests for delegation token based authentication
> --
>
> Key: KAFKA-4544
> URL: https://issues.apache.org/jira/browse/KAFKA-4544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ashish K Singh
>
> Add capability to renew/expire delegation tokens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4544) Add system tests for delegation token based authentication

2016-12-14 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-4544:
--
Description: Add system tests for delegation token based authentication.  
(was: Add capability to renew/expire delegation tokens.)

> Add system tests for delegation token based authentication
> --
>
> Key: KAFKA-4544
> URL: https://issues.apache.org/jira/browse/KAFKA-4544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ashish K Singh
>
> Add system tests for delegation token based authentication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4541) Add capability to create delegation token

2016-12-14 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh reassigned KAFKA-4541:
-

Assignee: Ashish K Singh

> Add capability to create delegation token
> -
>
> Key: KAFKA-4541
> URL: https://issues.apache.org/jira/browse/KAFKA-4541
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> Add request/ response and server side handling to create delegation tokens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1696) Kafka should be able to generate Hadoop delegation tokens

2016-12-14 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749769#comment-15749769
 ] 

Ashish K Singh commented on KAFKA-1696:
---

[~omkreddy] I had to convert this JIRA from sub-task to a new feature JIRA, to 
be able to create sub-tasks. Also created some sub-tasks. I have left most of 
them open, except the first one, for which I plan to submit a PR shortly. Also, 
I have partially looked into authentication via tokens, basically add a 
TokenAuthenticator, so if you are Ok with me taking up that task, I will be 
happy to.

> Kafka should be able to generate Hadoop delegation tokens
> -
>
> Key: KAFKA-1696
> URL: https://issues.apache.org/jira/browse/KAFKA-1696
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
>
> For access from MapReduce/etc jobs run on behalf of a user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4542) Add authentication based on delegation token.

2016-12-14 Thread Ashish K Singh (JIRA)
Ashish K Singh created KAFKA-4542:
-

 Summary: Add authentication based on delegation token.
 Key: KAFKA-4542
 URL: https://issues.apache.org/jira/browse/KAFKA-4542
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ashish K Singh


Add authentication based on delegation token.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2258: MINOR: update KStream JavaDocs

2016-12-14 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2258

MINOR: update KStream JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka minorKStreamJavaDoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2258.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2258


commit 7e9aa16a7c5e8bdb1a48f7639efd1b448c07a611
Author: Matthias J. Sax 
Date:   2016-12-15T00:41:45Z

MINOR: update KStream JavaDocs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3856) Move inner classes accessible only functions in TopologyBuilder out of public APIs

2016-12-14 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750030#comment-15750030
 ] 

Matthias J. Sax commented on KAFKA-3856:


[~jeyhunkarimov] Are you still working on this?

> Move inner classes accessible only functions in TopologyBuilder out of public 
> APIs
> --
>
> Key: KAFKA-3856
> URL: https://issues.apache.org/jira/browse/KAFKA-3856
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> In {{TopologyBuilder}} there are a couple of public functions that are 
> actually only used in the internal classes such as StreamThread and 
> StreamPartitionAssignor, and some accessible only in high-level DSL inner 
> classes, examples include {{addInternalTopic}}, {{sourceGroups}} and 
> {{copartitionGroups}}, etc. But they are still listed in Javadocs since this 
> class is part of public APIs.
> We should think about moving them out of the public functions. Unfortunately 
> there is no "friend" access mode as in C++, so we need to think of another 
> way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3135) Unexpected delay before fetch response transmission

2016-12-14 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750082#comment-15750082
 ] 

Jeff Widman commented on KAFKA-3135:


Can the tags on this issue be updated to note that it applies to 0.10.x 
versions as well? 

At least, that's what it seems from reading through the issue description.

> Unexpected delay before fetch response transmission
> ---
>
> Key: KAFKA-3135
> URL: https://issues.apache.org/jira/browse/KAFKA-3135
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.2.0
>
>
> From the user list, Krzysztof Ciesielski reports the following:
> {quote}
> Scenario description:
> First, a producer writes 50 elements into a topic
> Then, a consumer starts to read, polling in a loop.
> When "max.partition.fetch.bytes" is set to a relatively small value, each
> "consumer.poll()" returns a batch of messages.
> If this value is left as default, the output tends to look like this:
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 0 elements
> Poll returned 13793 elements
> Poll returned 13793 elements
> As we can see, there are weird "gaps" when poll returns 0 elements for some
> time. What is the reason for that? Maybe there are some good practices
> about setting "max.partition.fetch.bytes" which I don't follow?
> {quote}
> The gist to reproduce this problem is here: 
> https://gist.github.com/kciesielski/054bb4359a318aa17561.
> After some initial investigation, the delay appears to be in the server's 
> networking layer. Basically I see a delay of 5 seconds from the time that 
> Selector.send() is invoked in SocketServer.Processor with the fetch response 
> to the time that the send is completed. Using netstat in the middle of the 
> delay shows the following output:
> {code}
> tcp4   0  0  10.191.0.30.55455  10.191.0.30.9092   ESTABLISHED
> tcp4   0 102400  10.191.0.30.9092   10.191.0.30.55454  ESTABLISHED
> {code}
> From this, it looks like the data reaches the send buffer, but needs to be 
> flushed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >