[jira] [Created] (KAFKA-4963) Global Store: startup recovery process skipping processor

2017-03-28 Thread Yennick Trevels (JIRA)
Yennick Trevels created KAFKA-4963:
--

 Summary: Global Store: startup recovery process skipping processor
 Key: KAFKA-4963
 URL: https://issues.apache.org/jira/browse/KAFKA-4963
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Yennick Trevels


This issue is related to the recovery process of a global store. It might be 
that I'm misunderstanding the design of the global store as it's all quite new 
to me, but I wanted to verify this case.
I'm trying to create a global store with a processor which transforms the 
values from the source and puts them into the state store, and I want all these 
transformed values to be available in every streams job (therefore the use of a 
global store)
I'll give you an example which I created based on an existing Kafka Streams 
unit test:

{code}
final StateStoreSupplier storeSupplier = Stores.create("my-store")

.withStringKeys().withIntegerValues().inMemory().disableLogging().build();
final String global = "global";
final String topic = "topic";
final KeyValueStore globalStore = (KeyValueStore) storeSupplier.get();
final TopologyBuilder topologyBuilder = this.builder
.addGlobalStore(globalStore, global, STRING_DESERIALIZER, 
STRING_DESERIALIZER, topic, "processor", define(new 
ValueToLengthStatefulProcessor("my-store")));

driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
assertEquals("value1".length(), globalStore.get("key1"));
assertEquals("value2".length(), globalStore.get("key2"));
{code}

The ValueToLengthStatefulProcessor basically takes the incoming value, 
calculates the length of the string, and puts the result in the state store. 
Note the difference in types between the source stream (string values) and the 
data store (integer values)

If I understand global stores correctly and based on what I've tried out 
already, the stream of data runs like this:
a source stream named "global" reading values from a Kafka topic called "topic" 
 -> ValueToLengthStatefulProcessor -> "my-store" state store

However, when the streams job starts up it runs the recovery process by reading 
out the source stream again. I've noticed that in this case it seems to skip 
the processor entirely and acts like the source stream is the changelog of the 
state store, making the data flow like this during the recovery process:
source stream -> "my store" state store

Because it acts like the source stream is the changelog of the state store, it 
also tries to use the deserializer of the state store. This won't work since 
the values of the state store should be integers, while the values in the 
source stream are strings.
So all this will startup nicely as long as the source stream has no values yet. 
However, once the source stream has (string) values, the startup recovery 
process will fail since it will be sourcing directly to the state store instead 
of passing the source values to the processor.

I believe this is caused by the following line of code in 
TopologyBuilder.addGlobalStore, which connects the store directly to the source 
topic.
https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507

Please let me know if I'm totally misunderstanding how global stores should 
work. But I think this might be a crucial bug or design flaw.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4964) Delete the kafka to prefix the name of the keystore and truststore file will be more suitable

2017-03-28 Thread shuguo zheng (JIRA)
shuguo zheng created KAFKA-4964:
---

 Summary: Delete the kafka to prefix the name of the keystore and 
truststore file will be more suitable
 Key: KAFKA-4964
 URL: https://issues.apache.org/jira/browse/KAFKA-4964
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0, 0.10.0.1, 0.10.0.0, 
0.9.0.1, 0.9.0.0
 Environment: 0.10.0.1
Reporter: shuguo zheng
Priority: Minor


Kafka to prefix the name of the keystore and truststore file,will possible to 
cause misdirection, because of according to the previous steps to generate the 
file name without that prefix. Delete the prefix may helpful to the kafka SSL 
beginners.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2748: MINOR: remove misleading scala setter in Controlle...

2017-03-28 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2748

MINOR: remove misleading scala setter in ControllerContext



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka 
remove-scala-setter-controller-context

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2748


commit ba2b26a98d7c4b2af48d3119fa601ac68ba8ba4a
Author: Onur Karaman 
Date:   2017-03-28T08:01:56Z

remove misleading scala setter in ControllerContext




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2749: [KAFKA-4964]Delete the kafka to prefix the name of...

2017-03-28 Thread zhengsg
GitHub user zhengsg opened a pull request:

https://github.com/apache/kafka/pull/2749

[KAFKA-4964]Delete the kafka to prefix the name of the keystore and 
truststore file will be more suitable

[https://issues.apache.org/jira/browse/KAFKA-4964](url)
Kafka to prefix the name of the keystore and truststore file,will possible 
to cause misdirection, because of according to the previous steps to generate 
the file name without that prefix. Delete the prefix may helpful to the kafka 
SSL beginners.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhengsg/kafka local

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2749


commit 9fc6529887ec2ed2b499eeb2fba842e581f0315a
Author: shuguo zheng 
Date:   2017-03-28T08:04:24Z

KAFKA-4964:Kafka to prefix the name of the keystore and truststore file,
will possible to cause misdirection, because of according to
the previous steps to generate the file name without that prefix.
Delete the prefix may helpful to the kafka SSL beginners.

commit d927e01e1ab7b85808063e5a2faf9e7d11811885
Author: shuguo zheng 
Date:   2017-03-28T08:15:12Z

hh

commit ebdf4ddcb5af912437ba095e8ef742ca506bf2b0
Author: shuguo zheng 
Date:   2017-03-28T08:21:21Z

Delete white line




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4964) Delete the kafka to prefix the name of the keystore and truststore file will be more suitable

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944758#comment-15944758
 ] 

ASF GitHub Bot commented on KAFKA-4964:
---

GitHub user zhengsg opened a pull request:

https://github.com/apache/kafka/pull/2749

[KAFKA-4964]Delete the kafka to prefix the name of the keystore and 
truststore file will be more suitable

[https://issues.apache.org/jira/browse/KAFKA-4964](url)
Kafka to prefix the name of the keystore and truststore file,will possible 
to cause misdirection, because of according to the previous steps to generate 
the file name without that prefix. Delete the prefix may helpful to the kafka 
SSL beginners.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhengsg/kafka local

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2749


commit 9fc6529887ec2ed2b499eeb2fba842e581f0315a
Author: shuguo zheng 
Date:   2017-03-28T08:04:24Z

KAFKA-4964:Kafka to prefix the name of the keystore and truststore file,
will possible to cause misdirection, because of according to
the previous steps to generate the file name without that prefix.
Delete the prefix may helpful to the kafka SSL beginners.

commit d927e01e1ab7b85808063e5a2faf9e7d11811885
Author: shuguo zheng 
Date:   2017-03-28T08:15:12Z

hh

commit ebdf4ddcb5af912437ba095e8ef742ca506bf2b0
Author: shuguo zheng 
Date:   2017-03-28T08:21:21Z

Delete white line




> Delete the kafka to prefix the name of the keystore and truststore file will 
> be more suitable
> -
>
> Key: KAFKA-4964
> URL: https://issues.apache.org/jira/browse/KAFKA-4964
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 
> 0.10.1.1, 0.10.2.0
> Environment: 0.10.0.1
>Reporter: shuguo zheng
>Priority: Minor
>
> Kafka to prefix the name of the keystore and truststore file,will possible to 
> cause misdirection, because of according to the previous steps to generate 
> the file name without that prefix. Delete the prefix may helpful to the kafka 
> SSL beginners.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4965) set leave.group.on.close to false in KafkaStreams

2017-03-28 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4965:
-

 Summary: set leave.group.on.close to false in KafkaStreams
 Key: KAFKA-4965
 URL: https://issues.apache.org/jira/browse/KAFKA-4965
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.11.0.0
Reporter: Damian Guy
Assignee: Damian Guy


We added the internal Consumer Config {{leave.group.on.close}} so that we can 
avoid unnecessary rebalances during bounces in streams etc. We just need to set 
this config to false.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4965) set leave.group.on.close to false in KafkaStreams

2017-03-28 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4965 started by Damian Guy.
-
> set leave.group.on.close to false in KafkaStreams
> -
>
> Key: KAFKA-4965
> URL: https://issues.apache.org/jira/browse/KAFKA-4965
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> We added the internal Consumer Config {{leave.group.on.close}} so that we can 
> avoid unnecessary rebalances during bounces in streams etc. We just need to 
> set this config to false.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4965) set internal.leave.group.on.close to false in KafkaStreams

2017-03-28 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4965:
--
Summary: set internal.leave.group.on.close to false in KafkaStreams  (was: 
set leave.group.on.close to false in KafkaStreams)

> set internal.leave.group.on.close to false in KafkaStreams
> --
>
> Key: KAFKA-4965
> URL: https://issues.apache.org/jira/browse/KAFKA-4965
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> We added the internal Consumer Config {{leave.group.on.close}} so that we can 
> avoid unnecessary rebalances during bounces in streams etc. We just need to 
> set this config to false.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2750: KAFKA-4965: set internal.leave.group.on.close to f...

2017-03-28 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2750

KAFKA-4965: set internal.leave.group.on.close to false in StreamsConfig

Set the internal consumer config internal.leave.group.on.close in 
`StreamsConfig`. This is to reduce the number of rebalances we get during 
bounces

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4965

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2750


commit 482c2259ef20f436fcc2dfe1a329b54cbc733db7
Author: Damian Guy 
Date:   2017-03-28T08:40:04Z

set internal.leave.group.on.close to false in StreamsConfig




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4965) set internal.leave.group.on.close to false in KafkaStreams

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944780#comment-15944780
 ] 

ASF GitHub Bot commented on KAFKA-4965:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2750

KAFKA-4965: set internal.leave.group.on.close to false in StreamsConfig

Set the internal consumer config internal.leave.group.on.close in 
`StreamsConfig`. This is to reduce the number of rebalances we get during 
bounces

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-4965

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2750


commit 482c2259ef20f436fcc2dfe1a329b54cbc733db7
Author: Damian Guy 
Date:   2017-03-28T08:40:04Z

set internal.leave.group.on.close to false in StreamsConfig




> set internal.leave.group.on.close to false in KafkaStreams
> --
>
> Key: KAFKA-4965
> URL: https://issues.apache.org/jira/browse/KAFKA-4965
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> We added the internal Consumer Config {{leave.group.on.close}} so that we can 
> avoid unnecessary rebalances during bounces in streams etc. We just need to 
> set this config to false.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Damian Guy
Matthias,

Yes i know.

Thanks,
Damian

On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax  wrote:

> Damian,
>
> about "rebalance immediately" on timeout -- I guess, that's a different
> case as no LeaveGroupRequest will be sent. Thus, the broker should be
> able to distinguish both cases easily, and apply the delay only if it
> received the LeaveGroupRequest but not if a consumer times out.
>
> Does this make sense?
>
> -Matthias
>
> On 3/27/17 1:56 AM, Damian Guy wrote:
> > @Becket
> >
> > Thanks for the feedback. Yes, i like the idea of extending the delay as
> > each new consumer joins the group. Though, i think this could be done
> with
> > either a consumer or broker side config. But i get your point that some
> > consumers in the group can be misconfigured.
> >
> > @Matthias & @Eno - yes we could probably do something similar if the
> member
> > has sent the LeaveGroupRequest. I'm not sure it would be valid if the
> > member crashed, hence session.timeout would come into play, we'd probably
> > want to rebalance immediately. I'd be interested in hearing thoughts from
> > other core kafka folks on this one.
> >
> > Thanks,
> > Damian
> >
> >
> >
> > On Fri, 24 Mar 2017 at 23:01 Becket Qin  wrote:
> >
> >> Hi Matthias,
> >>
> >> Yes, that was what I was thinking. We will keep delay it until either
> >> reaching the rebalance timeout or no new consumer joins in that small
> delay
> >> which is configured on the broker side.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax  >
> >> wrote:
> >>
> >>> @Becket:
> >>>
> >>> I am not sure, if I understand this correctly. Instead of applying a
> >>> fixed delay, that starts when the first consumer of an (empty) group
> >>> joins, you suggest to re-trigger/re-set the delay each time a new
> >>> consumer joins?
> >>>
> >>> This sound like a good strategy to me, if the config is on the broker
> >> side.
> >>>
> >>> @Eno:
> >>>
> >>> I think that's a valid point and I like this idea!
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/24/17 1:23 PM, Eno Thereska wrote:
>  Thanks Damian,
> 
>  This KIP deals with the initial phase only. What about the cases when
> >>> several consumers leave a group? Won't there be several expensive
> >>> rebalances then as well? I'm wondering if it makes sense for the delay
> to
> >>> hold anytime the "set" of consumers in a group changes, be it addition
> to
> >>> the group or removal from group.
> 
>  Thanks
>  Eno
> 
> 
> > On 24 Mar 2017, at 20:04, Becket Qin  wrote:
> >
> > Thanks for the KIP, Damian.
> >
> > My two cents on this. It seems there are two things worth thinking
> >> here:
> >
> > 1. Better rebalance timing. We will try to rebalance only when all
> the
> > consumers in a group have joined. The challenge would be someone has
> >> to
> > define what does ALL consumers mean, it could either be a time or
> >>> number of
> > consumers, etc.
> >
> > 2. Avoid frequent rebalance. For example, if there are 100 consumers
> >> in
> >>> a
> > group, today, in the worst case, we may end up with 100 rebalances
> >> even
> >>> if
> > all the consumers joined the group in a reasonably small amount of
> >> time.
> > Frequent rebalance is also a bad thing for brokers.
> >
> > Having a client side configuration may solve problem 1 better because
> >>> each
> > consumer group can potentially configure their own timing. However,
> it
> >>> does
> > not really prevent frequent rebalance in general because some of the
> > consumers can be misconfigured. (This may have something to do with
> >>> KIP-124
> > as well. But if quota is applied on the JoinGroup/SyncGroup request
> it
> >>> may
> > cause some unwanted cascading effects.)
> >
> > Having a broker side configuration may result in less flexibility for
> >>> each
> > consumer group, but it can prevent frequent rebalance better. I think
> >>> with
> > some reasonable design, the rebalance timing issue can be resolved on
> >>> the
> > broker side as well. Matthias had a good point on extending the delay
> >>> when
> > a new consumer joins a group (we actually did something similar to
> >> batch
> > ISR change propagation). For example, let's say on the broker side,
> we
> >>> will
> > always delay 2 seconds each time we see a new consumer joining a
> >>> consumer
> > group. This would probably work for most of the consumer groups and
> >> will
> > also limit the rebalance frequency to protect the brokers.
> >
> > I am not sure about the streams use case here, but if something like
> 2
> > seconds of delay is acceptable for streams, I would prefer adding the
> > configuration to the broker so that we can address both problems.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Mar 24, 2017 at 5:30 AM, Damian Guy 
> 

Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Damian Guy
All,
I'd like to get this back to the original discussion about Delaying initial
consumer group rebalance.
I think i'm leaning towards sticking with the broker config and changing
the delay so that the timer starts again when a new consumer joins the
group. What are peoples thoughts on that?

Doing something similar on leave is valid, but i'd prefer to consider it
separately from this.

Thanks,
Damian

On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:

> Matthias,
>
> Yes i know.
>
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
> wrote:
>
> Damian,
>
> about "rebalance immediately" on timeout -- I guess, that's a different
> case as no LeaveGroupRequest will be sent. Thus, the broker should be
> able to distinguish both cases easily, and apply the delay only if it
> received the LeaveGroupRequest but not if a consumer times out.
>
> Does this make sense?
>
> -Matthias
>
> On 3/27/17 1:56 AM, Damian Guy wrote:
> > @Becket
> >
> > Thanks for the feedback. Yes, i like the idea of extending the delay as
> > each new consumer joins the group. Though, i think this could be done
> with
> > either a consumer or broker side config. But i get your point that some
> > consumers in the group can be misconfigured.
> >
> > @Matthias & @Eno - yes we could probably do something similar if the
> member
> > has sent the LeaveGroupRequest. I'm not sure it would be valid if the
> > member crashed, hence session.timeout would come into play, we'd probably
> > want to rebalance immediately. I'd be interested in hearing thoughts from
> > other core kafka folks on this one.
> >
> > Thanks,
> > Damian
> >
> >
> >
> > On Fri, 24 Mar 2017 at 23:01 Becket Qin  wrote:
> >
> >> Hi Matthias,
> >>
> >> Yes, that was what I was thinking. We will keep delay it until either
> >> reaching the rebalance timeout or no new consumer joins in that small
> delay
> >> which is configured on the broker side.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax  >
> >> wrote:
> >>
> >>> @Becket:
> >>>
> >>> I am not sure, if I understand this correctly. Instead of applying a
> >>> fixed delay, that starts when the first consumer of an (empty) group
> >>> joins, you suggest to re-trigger/re-set the delay each time a new
> >>> consumer joins?
> >>>
> >>> This sound like a good strategy to me, if the config is on the broker
> >> side.
> >>>
> >>> @Eno:
> >>>
> >>> I think that's a valid point and I like this idea!
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/24/17 1:23 PM, Eno Thereska wrote:
>  Thanks Damian,
> 
>  This KIP deals with the initial phase only. What about the cases when
> >>> several consumers leave a group? Won't there be several expensive
> >>> rebalances then as well? I'm wondering if it makes sense for the delay
> to
> >>> hold anytime the "set" of consumers in a group changes, be it addition
> to
> >>> the group or removal from group.
> 
>  Thanks
>  Eno
> 
> 
> > On 24 Mar 2017, at 20:04, Becket Qin  wrote:
> >
> > Thanks for the KIP, Damian.
> >
> > My two cents on this. It seems there are two things worth thinking
> >> here:
> >
> > 1. Better rebalance timing. We will try to rebalance only when all
> the
> > consumers in a group have joined. The challenge would be someone has
> >> to
> > define what does ALL consumers mean, it could either be a time or
> >>> number of
> > consumers, etc.
> >
> > 2. Avoid frequent rebalance. For example, if there are 100 consumers
> >> in
> >>> a
> > group, today, in the worst case, we may end up with 100 rebalances
> >> even
> >>> if
> > all the consumers joined the group in a reasonably small amount of
> >> time.
> > Frequent rebalance is also a bad thing for brokers.
> >
> > Having a client side configuration may solve problem 1 better because
> >>> each
> > consumer group can potentially configure their own timing. However,
> it
> >>> does
> > not really prevent frequent rebalance in general because some of the
> > consumers can be misconfigured. (This may have something to do with
> >>> KIP-124
> > as well. But if quota is applied on the JoinGroup/SyncGroup request
> it
> >>> may
> > cause some unwanted cascading effects.)
> >
> > Having a broker side configuration may result in less flexibility for
> >>> each
> > consumer group, but it can prevent frequent rebalance better. I think
> >>> with
> > some reasonable design, the rebalance timing issue can be resolved on
> >>> the
> > broker side as well. Matthias had a good point on extending the delay
> >>> when
> > a new consumer joins a group (we actually did something similar to
> >> batch
> > ISR change propagation). For example, let's say on the broker side,
> we
> >>> will
> > always delay 2 seconds each time we see a new consumer joining a
> >>> consumer
> > group. This would probably work for most of the 

[jira] [Created] (KAFKA-4966) Producer maybe throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread Bo Wang (JIRA)
Bo Wang created KAFKA-4966:
--

 Summary: Producer maybe throw a NullPointerException under a 
network environment where packet loss and error packets exist.
 Key: KAFKA-4966
 URL: https://issues.apache.org/jira/browse/KAFKA-4966
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.0, 0.10.0.1
Reporter: Bo Wang


2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | producer-1] 
|  Uncaught error in request completion:  | 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
java.lang.NullPointerException
at 
org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4966) Producer throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread Bo Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Wang updated KAFKA-4966:
---
Summary: Producer throw a NullPointerException under a network environment 
where packet loss and error packets exist.  (was: Producer maybe throw a 
NullPointerException under a network environment where packet loss and error 
packets exist.)

> Producer throw a NullPointerException under a network environment where 
> packet loss and error packets exist.
> 
>
> Key: KAFKA-4966
> URL: https://issues.apache.org/jira/browse/KAFKA-4966
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1, 0.10.2.0
>Reporter: Bo Wang
>
> 2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | 
> producer-1] |  Uncaught error in request completion:  | 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2751: Document ordering contract of iterator for window ...

2017-03-28 Thread miguno
GitHub user miguno opened a pull request:

https://github.com/apache/kafka/pull/2751

Document ordering contract of iterator for window stores and session stores



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/miguno/kafka 
trunk-streams-window-iterator-doc-fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2751.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2751


commit 39fa8ea770663398238d456801329bbfad5c2d6a
Author: Michael G. Noll 
Date:   2017-03-28T09:10:18Z

Document ordering contract of iterator for window stores and session stores




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4966) Producer throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944879#comment-15944879
 ] 

Ismael Juma commented on KAFKA-4966:


Thanks for the report. Can you specify the Kafka Producer version for the 
stacktrace posted? There are two versions listed in "Affects version", so 
that's why I'm asking.

> Producer throw a NullPointerException under a network environment where 
> packet loss and error packets exist.
> 
>
> Key: KAFKA-4966
> URL: https://issues.apache.org/jira/browse/KAFKA-4966
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1, 0.10.2.0
>Reporter: Bo Wang
>
> 2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | 
> producer-1] |  Uncaught error in request completion:  | 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

2017-03-28 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3758:
---

Assignee: Eno Thereska  (was: Guozhang Wang)

> KStream job fails to recover after Kafka broker stopped
> ---
>
> Key: KAFKA-3758
> URL: https://issues.apache.org/jira/browse/KAFKA-3758
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Eno Thereska
> Attachments: muon.log.1.gz
>
>
> We've been doing some testing of a fairly complex KStreams job and under load 
> it seems the job fails to rebalance + recover if we shut down one of the 
> kafka brokers. The test we were running had a 3-node kafka cluster where each 
> topic had at least a replication factor of 2, and we terminated one of the 
> nodes.
> Attached is the full log, the root exception seems to be contention on the 
> lock on the state directory. The job continues to try to recover but throws 
> errors relating to locks over and over. Restarting the job itself resolves 
> the problem.
>  1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> creating the state manager
>  1703 at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:71)
>  1704 at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:86)
>  1705 at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>  1706 at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>  1707 at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>  1708 at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>  1709 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>  1710 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>  1711 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>  1712 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1713 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1714 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>  1715 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1716 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1717 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>  1718 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>  1719 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>  1720 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>  1721 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>  1722 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>  1723 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>  1724 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>  1725 at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>  1726 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>  1727 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  1728 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>  1729 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>  1730 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>  1731 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordina

[GitHub] kafka pull request #2752: Using English characters to prevent garbled charac...

2017-03-28 Thread tyronecai
GitHub user tyronecai opened a pull request:

https://github.com/apache/kafka/pull/2752

Using English characters to prevent garbled characters

to fix garbled characters

Stream as Table: A stream can be considered a changelog of a table, 
where each data record in the stream captures a state change of the table. A 
stream is thus a table in disguise, and it can be easily turned into a 
鈥渞eal鈥� table by replaying the changelog from beginning to end to 
reconstruct the table.   Similarly, in a more general analogy, aggregating data 
records in a stream 鈥� such as computing the   total number of pageviews by 
user from a stream of pageview events 鈥� will return a table (here with the 
key and the value being the user and its corresponding pageview count, 
respectively).
Table as Stream: A table can be considered a snapshot, at a point in 
time, of the latest value for each key in a stream (a stream's data records are 
key-value pairs). A table is thus a stream in disguise, and it can be easily 
turned into a 鈥渞eal鈥� stream by iterating over each key-value entry in 
the table.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tyronecai/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2752.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2752


commit 2f261072ce7a96c67703993c9296a84c4c0b282a
Author: cai.rong 
Date:   2017-03-28T09:22:54Z

Using English characters to prevent garbled




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2753: KAFKA-4966:Producer throw a NullPointerException...

2017-03-28 Thread Aegeaner
GitHub user Aegeaner opened a pull request:

https://github.com/apache/kafka/pull/2753

KAFKA-4966:Producer throw a NullPointerException under a network 
environment where packet loss and error packets exist.

fix KAFKA-4966 NPE in sender complete batch

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aegeaner/kafka KAFKA-4966

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2753.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2753


commit 4c063fda99096cc07deb2e1a424f17b7ed3aeee4
Author: Aegeaner 
Date:   2017-03-28T11:01:29Z

fix complete batch NPE




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4966) Producer throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944946#comment-15944946
 ] 

ASF GitHub Bot commented on KAFKA-4966:
---

GitHub user Aegeaner opened a pull request:

https://github.com/apache/kafka/pull/2753

KAFKA-4966:Producer throw a NullPointerException under a network 
environment where packet loss and error packets exist.

fix KAFKA-4966 NPE in sender complete batch

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aegeaner/kafka KAFKA-4966

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2753.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2753


commit 4c063fda99096cc07deb2e1a424f17b7ed3aeee4
Author: Aegeaner 
Date:   2017-03-28T11:01:29Z

fix complete batch NPE




> Producer throw a NullPointerException under a network environment where 
> packet loss and error packets exist.
> 
>
> Key: KAFKA-4966
> URL: https://issues.apache.org/jira/browse/KAFKA-4966
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1, 0.10.2.0
>Reporter: Bo Wang
>
> 2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | 
> producer-1] |  Uncaught error in request completion:  | 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4960) Invalid state store exception

2017-03-28 Thread j yeargers (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

j yeargers updated KAFKA-4960:
--
Attachment: text.html

This is essentially all the app does:


ObjectMapper objectMapper = new ObjectMapper();
KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream sourceStream = 
kStreamBuilder.stream(yamlConfigRunner.getString("topic_sourcestream"));

RowReducer rowReducer = new RowReducer();
KTable, String> outTable = 
sourceStream.groupByKey().reduce(rowReducer,
TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * 
1000).until(70 * 60 * 1000L),
"HourAggStore");

outTable.foreach((k, v) -> {
DDog.getDDog().increment("valueCount");
});

KGroupedStream kGroupedStream = 
sourceStream.groupByKey();
kGroupedStream.count("countstore");
kGroupedStream.count(TimeWindows.of(6), "windowed-store-count");

kafkaStreams = new KafkaStreams(kStreamBuilder,config);
kafkaStreams.start();

The 'RowReducer' takes the  JSON from the topic, converts it back to an Object 
and combines two List objects.

Mostly this is for me to attempt a distributed query app. It uses 
com.sun.net.httpserver.* as a 'contact point'

On Mon, Mar 27, 2017 at 5:56 PM, Matthias J. Sax (JIRA) 
mailto:j...@apache.org>> wrote:

[ 
https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944329#comment-15944329
 ]

Matthias J. Sax commented on KAFKA-4960:


Thanks for reporting this. Can you give some more context information? How can 
we reproduce this?




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)



> Invalid state store exception
> -
>
> Key: KAFKA-4960
> URL: https://issues.apache.org/jira/browse/KAFKA-4960
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: j yeargers
> Attachments: text.html
>
>
> Attempts to run windowed aggregation returns this exception:
> 2017-03-27 20:14:28,776 [StreamThread-1] WARN   
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING 
> to NOT_RUNNING
> 2017-03-27 20:14:28,776 [StreamThread-1] WARN   
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING 
> to NOT_RUNNING
> Exception in thread "StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=vi_preproc, 
> partition=1, offset=243574962
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: store 
> %s has closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
>   at 
> org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
>   at 
> org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:84)
>   at 
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:35)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamWindowReduceProcessor.process(KStreamWindowReduce.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
>   at 
> org.apache.kafka.streams.processor.interna

[jira] [Updated] (KAFKA-4873) Investigate issues uncovered by CORDS

2017-03-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4873:
---
Description: 
"CORDS is a fault-injection system consisting of errfs, a FUSE file system, and 
errbench, a set of workloads and a behaviour inference script for each system 
under test."

Kafka seemed to have issues with single bit errors, see the following for more 
details:

https://blog.acolyer.org/2017/03/08/redundancy-does-not-imply-fault-tolerance-analysis-of-distributed-storage-reactions-to-single-errors-and-corruptions/

All the code seems to be available:

http://research.cs.wisc.edu/adsl/Software/cords/

  was:
"CORDS is a fault-injection system consisting of errfs, a FUSE file system, and 
errbench, a set of workloads and a behaviour inference script for each system 
under test."

Kafka seemed to have issues with single bit errors, see the following for more 
details:

https://blog.acolyer.org/2017/03/08/redundancy-does-not-imply-fault-tolerance-analysis-of-distributed-storage-reactions-to-single-errors-and-corruptions/


> Investigate issues uncovered by CORDS
> -
>
> Key: KAFKA-4873
> URL: https://issues.apache.org/jira/browse/KAFKA-4873
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Critical
>  Labels: reliability
>
> "CORDS is a fault-injection system consisting of errfs, a FUSE file system, 
> and errbench, a set of workloads and a behaviour inference script for each 
> system under test."
> Kafka seemed to have issues with single bit errors, see the following for 
> more details:
> https://blog.acolyer.org/2017/03/08/redundancy-does-not-imply-fault-tolerance-analysis-of-distributed-storage-reactions-to-single-errors-and-corruptions/
> All the code seems to be available:
> http://research.cs.wisc.edu/adsl/Software/cords/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4460) Consumer stops getting messages when partition leader dies

2017-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945085#comment-15945085
 ] 

Ismael Juma commented on KAFKA-4460:


Hi, would it be possible for you to test a newer version (0.10.2 or trunk) to 
see if the issue still occurs?

> Consumer stops getting messages when partition leader dies
> --
>
> Key: KAFKA-4460
> URL: https://issues.apache.org/jira/browse/KAFKA-4460
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Bernhard Bonigl
>  Labels: reliability
>
> I have a setup consisting of 2 Kafka broker (0 and 1) using a zookeeper, a 
> spring boot application with producers and a spring boot application with 
> consumers.
> The topic has 5 partitions and a replication factor of 2, both brokers are in 
> sync, partitions have alternating leader (although it doesn't matter).
> The spring boot kafka configuration is setup as follows:
> {code}
> kafka.address: localhost:9092,localhost:9093
> kafka.numberOfConsumers: 20
> {code}
> Where Broker 0 uses port 9092 and Broker 1 uses port 9093.
> 
> When sending events they are consumed just fine. When Broker 0 is killed all 
> topics get Broker 1 as their leader, however the consumers stop consuming 
> events until Broker 0 is back. This happens nearly every time, but usually it 
> takes at most 3 attempts of alternatively killing the leading broker to 
> create the error state.
> The console log is getting spammed by the coordinators, it looks like the 
> coordinator representing broker 0 is marked as dead, but instantly 
> rediscovered and used again many many times, and only at the end the other 
> broker is discovered. When the switch works the log is only minimally spammed 
> and the other broker is discovered very quickly.
> [This gist | 
> https://gist.github.com/bonii-xx/2f1c122f643019a1525fbe120e9162d8] contains 
> the log of the application when the problem occurs. The first line is a log 
> of ours indicating a successfully consumed message. After that the Broker 0 
> (localhost:9092) is killed - you can see the log spam I was talking about. At 
> the end localhost:9093 is discovered, however no further messages are 
> consumed. After that I killed the application.
> 
> I also discovered [this | 
> https://stackoverflow.com/questions/39650993/kafka-consumer-abstractcoordinator-discovered-coordinator-java-client]
>  unresolved stackoverflow question, which seems to be the same problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3123) Follower Broker cannot start if offsets are already out of range

2017-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945094#comment-15945094
 ] 

Ismael Juma commented on KAFKA-3123:


[~mimaison], thanks for the comment. Can you share the broker version?

> Follower Broker cannot start if offsets are already out of range
> 
>
> Key: KAFKA-3123
> URL: https://issues.apache.org/jira/browse/KAFKA-3123
> Project: Kafka
>  Issue Type: Bug
>  Components: core, replication
>Affects Versions: 0.9.0.0
>Reporter: Soumyajit Sahu
>Assignee: Soumyajit Sahu
>Priority: Critical
>  Labels: patch
> Fix For: 0.11.0.0
>
> Attachments: 
> 0001-Fix-Follower-crashes-when-offset-out-of-range-during.patch
>
>
> I was trying to upgrade our test Windows cluster from 0.8.1.1 to 0.9.0 one 
> machine at a time. Our logs have just 2 hours of retention. I had re-imaged 
> the test machine under consideration, and got the following error in loop 
> after starting afresh with 0.9.0 broker:
> [2016-01-19 13:57:28,809] WARN [ReplicaFetcherThread-1-169595708], Replica 
> 15588 for partition [EventLogs4,1] reset its fetch offset from 0 to 
> current leader 169595708's start offset 334086 
> (kafka.server.ReplicaFetcherThread)
> [2016-01-19 13:57:28,809] ERROR [ReplicaFetcherThread-1-169595708], Error 
> getting offset for partition [EventLogs4,1] to broker 169595708 
> (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalStateException: Compaction for partition [EXO_EventLogs4,1] 
> cannot be aborted and paused since it is in LogCleaningPaused state.
>   at 
> kafka.log.LogCleanerManager$$anonfun$abortAndPauseCleaning$1.apply$mcV$sp(LogCleanerManager.scala:149)
>   at 
> kafka.log.LogCleanerManager$$anonfun$abortAndPauseCleaning$1.apply(LogCleanerManager.scala:140)
>   at 
> kafka.log.LogCleanerManager$$anonfun$abortAndPauseCleaning$1.apply(LogCleanerManager.scala:140)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.log.LogCleanerManager.abortAndPauseCleaning(LogCleanerManager.scala:140)
>   at kafka.log.LogCleaner.abortAndPauseCleaning(LogCleaner.scala:141)
>   at kafka.log.LogManager.truncateFullyAndStartAt(LogManager.scala:304)
>   at 
> kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:185)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:152)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:120)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> I could unblock myself with a code change. I deleted the action for "case s 
> =>" in the LogCleanerManager.scala's abortAndPauseCleaning(). I think we 
> should not throw exception if the state is already LogCleaningAborted or 
> LogCleaningPaused in this function, but instead just let it roll.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3123) Follower Broker cannot start if offsets are already out of range

2017-03-28 Thread Mickael Maison (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945105#comment-15945105
 ] 

Mickael Maison commented on KAFKA-3123:
---

We're running 0.10.0.1. I've added a comment in the PR with more details: 
https://github.com/apache/kafka/pull/1716#discussion_r108386426

> Follower Broker cannot start if offsets are already out of range
> 
>
> Key: KAFKA-3123
> URL: https://issues.apache.org/jira/browse/KAFKA-3123
> Project: Kafka
>  Issue Type: Bug
>  Components: core, replication
>Affects Versions: 0.9.0.0
>Reporter: Soumyajit Sahu
>Assignee: Soumyajit Sahu
>Priority: Critical
>  Labels: patch
> Fix For: 0.11.0.0
>
> Attachments: 
> 0001-Fix-Follower-crashes-when-offset-out-of-range-during.patch
>
>
> I was trying to upgrade our test Windows cluster from 0.8.1.1 to 0.9.0 one 
> machine at a time. Our logs have just 2 hours of retention. I had re-imaged 
> the test machine under consideration, and got the following error in loop 
> after starting afresh with 0.9.0 broker:
> [2016-01-19 13:57:28,809] WARN [ReplicaFetcherThread-1-169595708], Replica 
> 15588 for partition [EventLogs4,1] reset its fetch offset from 0 to 
> current leader 169595708's start offset 334086 
> (kafka.server.ReplicaFetcherThread)
> [2016-01-19 13:57:28,809] ERROR [ReplicaFetcherThread-1-169595708], Error 
> getting offset for partition [EventLogs4,1] to broker 169595708 
> (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalStateException: Compaction for partition [EXO_EventLogs4,1] 
> cannot be aborted and paused since it is in LogCleaningPaused state.
>   at 
> kafka.log.LogCleanerManager$$anonfun$abortAndPauseCleaning$1.apply$mcV$sp(LogCleanerManager.scala:149)
>   at 
> kafka.log.LogCleanerManager$$anonfun$abortAndPauseCleaning$1.apply(LogCleanerManager.scala:140)
>   at 
> kafka.log.LogCleanerManager$$anonfun$abortAndPauseCleaning$1.apply(LogCleanerManager.scala:140)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.log.LogCleanerManager.abortAndPauseCleaning(LogCleanerManager.scala:140)
>   at kafka.log.LogCleaner.abortAndPauseCleaning(LogCleaner.scala:141)
>   at kafka.log.LogManager.truncateFullyAndStartAt(LogManager.scala:304)
>   at 
> kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:185)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:152)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:122)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:122)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:120)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:120)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
>   at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:120)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:93)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> I could unblock myself with a code change. I deleted the action for "case s 
> =>" in the LogCleanerManager.scala's abortAndPauseCleaning(). I think we 
> should not throw exception if the state is already LogCleaningAborted or 
> LogCleaningPaused in this function, but instead just let it roll.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4882) Remove internal converter configuration from example property files

2017-03-28 Thread Mitch Seymour (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mitch Seymour updated KAFKA-4882:
-
Fix Version/s: 0.10.2.1

> Remove internal converter configuration from example property files
> ---
>
> Key: KAFKA-4882
> URL: https://issues.apache.org/jira/browse/KAFKA-4882
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Mitch Seymour
>  Labels: newbie
> Fix For: 0.10.2.1
>
>
> Our internal converter configuration is shown in 
> connect-distributed.properties and connect-standalone.properties.
> This tempts users to change it.
> In particular, they seem to believe it needs to be identical to key/value 
> converters.
> In reality, Connect doesn't deal well with anything other than schemaless 
> JSON as the internal converter. Users get errors and find it hard to figure 
> out what went wrong (since this is internal, they are not expected to).
> Let's stop tempting users into shooting their own feet?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3039) Temporary loss of leader resulted in log being completely truncated

2017-03-28 Thread Ralph Weires (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945156#comment-15945156
 ] 

Ralph Weires commented on KAFKA-3039:
-

Experienced the same problem on v0.10.2.0, during update of brokers from 
0.8.2.1 to that version. One of the brokers truncated almost all of its data on 
startup, which was the first startup on the new 0.10.2.0 version as part of a 
rolling restart of the cluster. Luckily, it was the only broker who behaved 
like this...

Log snippet from the affected broker, for a sample partition:
[2017-03-28 09:42:01,614] INFO Completed load of log entries-112 with 12 log 
segments and log end offset 354903677 in 14 ms (kafka.log.Log)
[2017-03-28 09:42:42,025] INFO Partition [entries,112] on broker 19: No 
checkpointed highwatermark is found for partition entries-112 
(kafka.cluster.Partition)
[2017-03-28 09:42:48,031] INFO Truncating log entries-112 to offset 0. 
(kafka.log.Log)


> Temporary loss of leader resulted in log being completely truncated
> ---
>
> Key: KAFKA-3039
> URL: https://issues.apache.org/jira/browse/KAFKA-3039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>  Labels: reliability
>
> We had an event recently where the temporarily loss of a leader for a 
> partition (during a manual restart), resulted in the leader coming back with 
> no high watermark state and truncating its log to zero. Logs (attached below) 
> indicate that it did have the data but not the commit state. How is this 
> possible?
> Leader (broker 3)
> [2015-12-18 21:19:44,666] INFO Completed load of log messages-14 with log end 
> offset 14175963374 (kafka.log.Log)
> [2015-12-18 21:19:45,170] INFO Partition [messages,14] on broker 3: No 
> checkpointed highwatermark is found for partition [messages,14] 
> (kafka.cluster.Partition)
> [2015-12-18 21:19:45,238] INFO Truncating log messages-14 to offset 0. 
> (kafka.log.Log)
> [2015-12-18 21:20:34,066] INFO Partition [messages,14] on broker 3: Expanding 
> ISR for partition [messages,14] from 3 to 3,10 (kafka.cluster.Partition)
> Replica (broker 10)
> [2015-12-18 21:19:19,525] INFO Partition [messages,14] on broker 10: 
> Shrinking ISR for partition [messages,14] from 3,10,4 to 10,4 
> (kafka.cluster.Partition)
> [2015-12-18 21:20:34,049] ERROR [ReplicaFetcherThread-0-3], Current offset 
> 14175984203 for partition [messages,14] out of range; reset offset to 35977 
> (kafka.server.ReplicaFetcherThread)
> [2015-12-18 21:20:34,033] WARN [ReplicaFetcherThread-0-3], Replica 10 for 
> partition [messages,14] reset its fetch offset from 14175984203 to current 
> leader 3's latest offset 35977 (kafka.server.ReplicaFetcherThread)
> Some relevant config parameters:
> offsets.topic.replication.factor = 3
> offsets.commit.required.acks = -1
> replica.high.watermark.checkpoint.interval.ms = 5000
> unclean.leader.election.enable = false



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4967) java.io.EOFException Error while committing offsets

2017-03-28 Thread Upendra Yadav (JIRA)
Upendra Yadav created KAFKA-4967:


 Summary: java.io.EOFException Error while committing offsets
 Key: KAFKA-4967
 URL: https://issues.apache.org/jira/browse/KAFKA-4967
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.0.1
 Environment: OS : CentOS
Reporter: Upendra Yadav


kafka server and client : 0.10.0.1

kafka server side configuration :
listeners=PLAINTEXT://:9092
#below configuration is for old clients, that was exists before. but now every 
clients are already moved with latest kafka client - 0.10.0.1
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false

Some of configurations for kafka consumer :
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_172_19_255_187_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000

below exception I'm getting on commit offset.
Consumer process is still running after this exception..
but when I'm checking offset position through kafka shell scripts its showing 
old position(Could not fetch offset from topic1_group1 partition [topic1,0] due 
to missing offset data in zookeeper). after some time when 2nd commit comes 
then it get updated.

because of duel commit enabled, I think kafka side position get update 
successfully for both time.

ERROR kafka.consumer.ZookeeperConsumerConnector: [], Error while 
committing offsets.
java.io.EOFException
at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at 
kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
at 
kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at 
kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at 
com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at 
com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4955) Add network handler thread utilization to request quota calculation

2017-03-28 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945223#comment-15945223
 ] 

Rajini Sivaram commented on KAFKA-4955:
---

Have included this is in the KAFKA-4954 PR 
(https://github.com/apache/kafka/pull/2744) since it is a relatively small 
change.

> Add network handler thread utilization to request quota calculation
> ---
>
> Key: KAFKA-4955
> URL: https://issues.apache.org/jira/browse/KAFKA-4955
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.11.0.0
>
>
> Add network thread utilization time to the metrics used for throttling based 
> on CPU utilization for requests. This time will be taken into account for the 
> throttling decision made when request handler thread time is recorded for the 
> subsequent request on the connection.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Matthias J. Sax
I don't understand, why you want to split join/leave into two parts...
But it up to you I guess.

+1 for broker config plus "retriggering" delay


-Matthias

On 3/28/17 1:53 AM, Damian Guy wrote:
> All,
> I'd like to get this back to the original discussion about Delaying initial
> consumer group rebalance.
> I think i'm leaning towards sticking with the broker config and changing
> the delay so that the timer starts again when a new consumer joins the
> group. What are peoples thoughts on that?
> 
> Doing something similar on leave is valid, but i'd prefer to consider it
> separately from this.
> 
> Thanks,
> Damian
> 
> On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:
> 
>> Matthias,
>>
>> Yes i know.
>>
>> Thanks,
>> Damian
>>
>> On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
>> wrote:
>>
>> Damian,
>>
>> about "rebalance immediately" on timeout -- I guess, that's a different
>> case as no LeaveGroupRequest will be sent. Thus, the broker should be
>> able to distinguish both cases easily, and apply the delay only if it
>> received the LeaveGroupRequest but not if a consumer times out.
>>
>> Does this make sense?
>>
>> -Matthias
>>
>> On 3/27/17 1:56 AM, Damian Guy wrote:
>>> @Becket
>>>
>>> Thanks for the feedback. Yes, i like the idea of extending the delay as
>>> each new consumer joins the group. Though, i think this could be done
>> with
>>> either a consumer or broker side config. But i get your point that some
>>> consumers in the group can be misconfigured.
>>>
>>> @Matthias & @Eno - yes we could probably do something similar if the
>> member
>>> has sent the LeaveGroupRequest. I'm not sure it would be valid if the
>>> member crashed, hence session.timeout would come into play, we'd probably
>>> want to rebalance immediately. I'd be interested in hearing thoughts from
>>> other core kafka folks on this one.
>>>
>>> Thanks,
>>> Damian
>>>
>>>
>>>
>>> On Fri, 24 Mar 2017 at 23:01 Becket Qin  wrote:
>>>
 Hi Matthias,

 Yes, that was what I was thinking. We will keep delay it until either
 reaching the rebalance timeout or no new consumer joins in that small
>> delay
 which is configured on the broker side.

 Thanks,

 Jiangjie (Becket) Qin

 On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax >>
 wrote:

> @Becket:
>
> I am not sure, if I understand this correctly. Instead of applying a
> fixed delay, that starts when the first consumer of an (empty) group
> joins, you suggest to re-trigger/re-set the delay each time a new
> consumer joins?
>
> This sound like a good strategy to me, if the config is on the broker
 side.
>
> @Eno:
>
> I think that's a valid point and I like this idea!
>
>
> -Matthias
>
>
> On 3/24/17 1:23 PM, Eno Thereska wrote:
>> Thanks Damian,
>>
>> This KIP deals with the initial phase only. What about the cases when
> several consumers leave a group? Won't there be several expensive
> rebalances then as well? I'm wondering if it makes sense for the delay
>> to
> hold anytime the "set" of consumers in a group changes, be it addition
>> to
> the group or removal from group.
>>
>> Thanks
>> Eno
>>
>>
>>> On 24 Mar 2017, at 20:04, Becket Qin  wrote:
>>>
>>> Thanks for the KIP, Damian.
>>>
>>> My two cents on this. It seems there are two things worth thinking
 here:
>>>
>>> 1. Better rebalance timing. We will try to rebalance only when all
>> the
>>> consumers in a group have joined. The challenge would be someone has
 to
>>> define what does ALL consumers mean, it could either be a time or
> number of
>>> consumers, etc.
>>>
>>> 2. Avoid frequent rebalance. For example, if there are 100 consumers
 in
> a
>>> group, today, in the worst case, we may end up with 100 rebalances
 even
> if
>>> all the consumers joined the group in a reasonably small amount of
 time.
>>> Frequent rebalance is also a bad thing for brokers.
>>>
>>> Having a client side configuration may solve problem 1 better because
> each
>>> consumer group can potentially configure their own timing. However,
>> it
> does
>>> not really prevent frequent rebalance in general because some of the
>>> consumers can be misconfigured. (This may have something to do with
> KIP-124
>>> as well. But if quota is applied on the JoinGroup/SyncGroup request
>> it
> may
>>> cause some unwanted cascading effects.)
>>>
>>> Having a broker side configuration may result in less flexibility for
> each
>>> consumer group, but it can prevent frequent rebalance better. I think
> with
>>> some reasonable design, the rebalance timing issue can be resolved on
> the
>>> broker side as well. Matthias had a good point on extending the delay
> when
>>> a new consumer joins a group (we actually did something si

Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Ismael Juma
Is 3 seconds the right default if the timer gets reset after each consumer
joins? Maybe we can lower the default value given the new approach.

Ismael

On Tue, Mar 28, 2017 at 9:53 AM, Damian Guy  wrote:

> All,
> I'd like to get this back to the original discussion about Delaying initial
> consumer group rebalance.
> I think i'm leaning towards sticking with the broker config and changing
> the delay so that the timer starts again when a new consumer joins the
> group. What are peoples thoughts on that?
>
> Doing something similar on leave is valid, but i'd prefer to consider it
> separately from this.
>
> Thanks,
> Damian
>
> On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:
>
> > Matthias,
> >
> > Yes i know.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
> > wrote:
> >
> > Damian,
> >
> > about "rebalance immediately" on timeout -- I guess, that's a different
> > case as no LeaveGroupRequest will be sent. Thus, the broker should be
> > able to distinguish both cases easily, and apply the delay only if it
> > received the LeaveGroupRequest but not if a consumer times out.
> >
> > Does this make sense?
> >
> > -Matthias
> >
> > On 3/27/17 1:56 AM, Damian Guy wrote:
> > > @Becket
> > >
> > > Thanks for the feedback. Yes, i like the idea of extending the delay as
> > > each new consumer joins the group. Though, i think this could be done
> > with
> > > either a consumer or broker side config. But i get your point that some
> > > consumers in the group can be misconfigured.
> > >
> > > @Matthias & @Eno - yes we could probably do something similar if the
> > member
> > > has sent the LeaveGroupRequest. I'm not sure it would be valid if the
> > > member crashed, hence session.timeout would come into play, we'd
> probably
> > > want to rebalance immediately. I'd be interested in hearing thoughts
> from
> > > other core kafka folks on this one.
> > >
> > > Thanks,
> > > Damian
> > >
> > >
> > >
> > > On Fri, 24 Mar 2017 at 23:01 Becket Qin  wrote:
> > >
> > >> Hi Matthias,
> > >>
> > >> Yes, that was what I was thinking. We will keep delay it until either
> > >> reaching the rebalance timeout or no new consumer joins in that small
> > delay
> > >> which is configured on the broker side.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > >> wrote:
> > >>
> > >>> @Becket:
> > >>>
> > >>> I am not sure, if I understand this correctly. Instead of applying a
> > >>> fixed delay, that starts when the first consumer of an (empty) group
> > >>> joins, you suggest to re-trigger/re-set the delay each time a new
> > >>> consumer joins?
> > >>>
> > >>> This sound like a good strategy to me, if the config is on the broker
> > >> side.
> > >>>
> > >>> @Eno:
> > >>>
> > >>> I think that's a valid point and I like this idea!
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 3/24/17 1:23 PM, Eno Thereska wrote:
> >  Thanks Damian,
> > 
> >  This KIP deals with the initial phase only. What about the cases
> when
> > >>> several consumers leave a group? Won't there be several expensive
> > >>> rebalances then as well? I'm wondering if it makes sense for the
> delay
> > to
> > >>> hold anytime the "set" of consumers in a group changes, be it
> addition
> > to
> > >>> the group or removal from group.
> > 
> >  Thanks
> >  Eno
> > 
> > 
> > > On 24 Mar 2017, at 20:04, Becket Qin  wrote:
> > >
> > > Thanks for the KIP, Damian.
> > >
> > > My two cents on this. It seems there are two things worth thinking
> > >> here:
> > >
> > > 1. Better rebalance timing. We will try to rebalance only when all
> > the
> > > consumers in a group have joined. The challenge would be someone
> has
> > >> to
> > > define what does ALL consumers mean, it could either be a time or
> > >>> number of
> > > consumers, etc.
> > >
> > > 2. Avoid frequent rebalance. For example, if there are 100
> consumers
> > >> in
> > >>> a
> > > group, today, in the worst case, we may end up with 100 rebalances
> > >> even
> > >>> if
> > > all the consumers joined the group in a reasonably small amount of
> > >> time.
> > > Frequent rebalance is also a bad thing for brokers.
> > >
> > > Having a client side configuration may solve problem 1 better
> because
> > >>> each
> > > consumer group can potentially configure their own timing. However,
> > it
> > >>> does
> > > not really prevent frequent rebalance in general because some of
> the
> > > consumers can be misconfigured. (This may have something to do with
> > >>> KIP-124
> > > as well. But if quota is applied on the JoinGroup/SyncGroup request
> > it
> > >>> may
> > > cause some unwanted cascading effects.)
> > >
> > > Having a broker side configuration may result in less flexibility
> for
> > >>> each
> > > consumer group, but it can prevent f

LDAP with Kafka?

2017-03-28 Thread gregory grey
Hello everyone.

I'm a bit confused on LDAP auth status.

Documentation says  "AFAICT there is no SASL mechanism covering LDAP
(and the Java SASL library doesn't support it, at any rate".

The Java library linked documentation though, seems to indicate the opposite.

Can anyone clarify this please?


Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Damian Guy
@Ismael - yeah sure we can reduce the default, though i'm not sure what the
"right" default would be.

On Tue, 28 Mar 2017 at 15:40 Ismael Juma  wrote:

> Is 3 seconds the right default if the timer gets reset after each consumer
> joins? Maybe we can lower the default value given the new approach.
>
> Ismael
>
> On Tue, Mar 28, 2017 at 9:53 AM, Damian Guy  wrote:
>
> > All,
> > I'd like to get this back to the original discussion about Delaying
> initial
> > consumer group rebalance.
> > I think i'm leaning towards sticking with the broker config and changing
> > the delay so that the timer starts again when a new consumer joins the
> > group. What are peoples thoughts on that?
> >
> > Doing something similar on leave is valid, but i'd prefer to consider it
> > separately from this.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:
> >
> > > Matthias,
> > >
> > > Yes i know.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
> > > wrote:
> > >
> > > Damian,
> > >
> > > about "rebalance immediately" on timeout -- I guess, that's a different
> > > case as no LeaveGroupRequest will be sent. Thus, the broker should be
> > > able to distinguish both cases easily, and apply the delay only if it
> > > received the LeaveGroupRequest but not if a consumer times out.
> > >
> > > Does this make sense?
> > >
> > > -Matthias
> > >
> > > On 3/27/17 1:56 AM, Damian Guy wrote:
> > > > @Becket
> > > >
> > > > Thanks for the feedback. Yes, i like the idea of extending the delay
> as
> > > > each new consumer joins the group. Though, i think this could be done
> > > with
> > > > either a consumer or broker side config. But i get your point that
> some
> > > > consumers in the group can be misconfigured.
> > > >
> > > > @Matthias & @Eno - yes we could probably do something similar if the
> > > member
> > > > has sent the LeaveGroupRequest. I'm not sure it would be valid if the
> > > > member crashed, hence session.timeout would come into play, we'd
> > probably
> > > > want to rebalance immediately. I'd be interested in hearing thoughts
> > from
> > > > other core kafka folks on this one.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > >
> > > >
> > > > On Fri, 24 Mar 2017 at 23:01 Becket Qin 
> wrote:
> > > >
> > > >> Hi Matthias,
> > > >>
> > > >> Yes, that was what I was thinking. We will keep delay it until
> either
> > > >> reaching the rebalance timeout or no new consumer joins in that
> small
> > > delay
> > > >> which is configured on the broker side.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > >> wrote:
> > > >>
> > > >>> @Becket:
> > > >>>
> > > >>> I am not sure, if I understand this correctly. Instead of applying
> a
> > > >>> fixed delay, that starts when the first consumer of an (empty)
> group
> > > >>> joins, you suggest to re-trigger/re-set the delay each time a new
> > > >>> consumer joins?
> > > >>>
> > > >>> This sound like a good strategy to me, if the config is on the
> broker
> > > >> side.
> > > >>>
> > > >>> @Eno:
> > > >>>
> > > >>> I think that's a valid point and I like this idea!
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>
> > > >>> On 3/24/17 1:23 PM, Eno Thereska wrote:
> > >  Thanks Damian,
> > > 
> > >  This KIP deals with the initial phase only. What about the cases
> > when
> > > >>> several consumers leave a group? Won't there be several expensive
> > > >>> rebalances then as well? I'm wondering if it makes sense for the
> > delay
> > > to
> > > >>> hold anytime the "set" of consumers in a group changes, be it
> > addition
> > > to
> > > >>> the group or removal from group.
> > > 
> > >  Thanks
> > >  Eno
> > > 
> > > 
> > > > On 24 Mar 2017, at 20:04, Becket Qin 
> wrote:
> > > >
> > > > Thanks for the KIP, Damian.
> > > >
> > > > My two cents on this. It seems there are two things worth
> thinking
> > > >> here:
> > > >
> > > > 1. Better rebalance timing. We will try to rebalance only when
> all
> > > the
> > > > consumers in a group have joined. The challenge would be someone
> > has
> > > >> to
> > > > define what does ALL consumers mean, it could either be a time or
> > > >>> number of
> > > > consumers, etc.
> > > >
> > > > 2. Avoid frequent rebalance. For example, if there are 100
> > consumers
> > > >> in
> > > >>> a
> > > > group, today, in the worst case, we may end up with 100
> rebalances
> > > >> even
> > > >>> if
> > > > all the consumers joined the group in a reasonably small amount
> of
> > > >> time.
> > > > Frequent rebalance is also a bad thing for brokers.
> > > >
> > > > Having a client side configuration may solve problem 1 better
> > because
> > > >>> each
> > > > consumer group can potentially configure their own timing.
> However,
> > > it
> 

[jira] [Commented] (KAFKA-4689) OffsetValidationTest fails validation with "Current position greater than the total number of consumed records"

2017-03-28 Thread Roger Hoover (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945543#comment-15945543
 ] 

Roger Hoover commented on KAFKA-4689:
-

Happened again. Logs here 
http://confluent-kafka-0-10-2-system-test-results.s3-us-west-2.amazonaws.com/2017-03-28--001.1490697484--apache--0.10.2--1e4cab7/OffsetValidationTest/test_consumer_bounce/clean_shutdown%3DFalse.bounce_mode%3Drolling/49.tgz


test_id:
kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=False.bounce_mode=rolling
status: FAIL
run time:   3 minutes 32.756 seconds


Current position 79302 greater than the total number of consumed records 
79299
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 123, in run
data = self.run_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 176, in run_test
return self.test_context.function(self.test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
 line 321, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/client/consumer_test.py",
 line 159, in test_consumer_bounce
(consumer.current_position(partition), consumer.total_consumed())
AssertionError: Current position 79302 greater than the total number of 
consumed records 79299


> OffsetValidationTest fails validation with "Current position greater than the 
> total number of consumed records"
> ---
>
> Key: KAFKA-4689
> URL: https://issues.apache.org/jira/browse/KAFKA-4689
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Reporter: Ewen Cheslack-Postava
>Assignee: Jason Gustafson
>  Labels: system-test-failure
>
> {quote}
> 
> test_id:
> kafkatest.tests.client.consumer_test.OffsetValidationTest.test_consumer_bounce.clean_shutdown=False.bounce_mode=all
> status: FAIL
> run time:   1 minute 49.834 seconds
> Current position greater than the total number of consumed records
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 157, in test_consumer_bounce
> "Current position greater than the total number of consumed records"
> AssertionError: Current position greater than the total number of consumed 
> records
> {quote}
> See also 
> https://issues.apache.org/jira/browse/KAFKA-3513?focusedCommentId=15791790&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15791790
>  which is another instance of this bug, which indicates the issue goes back 
> at least as far as 1/17/2017. Note that I don't think we've seen this in 
> 0.10.1 yet.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (KAFKA-4755) SimpleBenchmark consume test fails for streams

2017-03-28 Thread Roger Hoover (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roger Hoover reopened KAFKA-4755:
-

Happened again.  Logs here:  
http://confluent-kafka-0-10-2-system-test-results.s3-us-west-2.amazonaws.com/2017-03-28--001.1490697484--apache--0.10.2--1e4cab7/StreamsSimpleBenchmarkTest/test_simple_benchmark/212.tgz




Streams Test process on ubuntu@worker2 took too long to exit
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 123, in run
data = self.run_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 176, in run_test
return self.test_context.function(self.test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
 line 48, in test_simple_benchmark
self.driver.wait()
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/services/streams.py",
 line 99, in wait
self.wait_node(node, timeout_sec)
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/services/streams.py",
 line 103, in wait_node
wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
err_msg="Streams Test process on " + str(node.account) + " took too long to 
exit")
  File 
"/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
 line 36, in wait_until
raise TimeoutError(err_msg)
TimeoutError: Streams Test process on ubuntu@worker2 took too long to exit



> SimpleBenchmark consume test fails for streams
> --
>
> Key: KAFKA-4755
> URL: https://issues.apache.org/jira/browse/KAFKA-4755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.0.0
>
>
> This occurred Feb 10th 2017:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=consume.scale=1
> status: FAIL
> run time:   7 minutes 36.712 seconds
> Streams Test process on ubuntu@worker2 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker2 took too long to exit



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2476: KAFKA-4586; Add purgeDataBefore() API (KIP-107)

2017-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2476


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4586) Add purgeDataBefore() API in AdminClient

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945553#comment-15945553
 ] 

ASF GitHub Bot commented on KAFKA-4586:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2476


> Add purgeDataBefore() API in AdminClient
> 
>
> Key: KAFKA-4586
> URL: https://issues.apache.org/jira/browse/KAFKA-4586
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Dong Lin
>Assignee: Dong Lin
> Fix For: 0.11.0.0
>
>
> Please visit 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient
>  for motivation etc.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-03-28 Thread Mickael Maison
Hi Guozhang,

Thanks for the feedback.

1) By MemoryPool, I mean the implementation added in KIP-72. That will
most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
merged yet.
I've updated the KIP to make it more obvious.

2) I was thinking to pass in the priority when creating the
Coordinator Node (in
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L582)
Then when calling Selector.connect() (in
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L643)
retrieve it and pass it in the Selector so it uses it when building
the Channel.
The idea was to avoid having to deduce the connection is for the
Coordinator from the ID but instead have it explicitly set by
AbstractCoordinator (and pass it all the way down to the Channel)

On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang  wrote:
> Mickael,
>
> Sorry for the late review of the KIP. I'm +1 on the proposed change as
> well. Just a few minor comments on the wiki itself:
>
> 1. By the "MemoryPool" are you referring to a new class impl or to reusing "
> org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
> the latter case, and if yes, could you update the wiki page to make it
> clear?
>
> 2. I think it is sufficient to add the priority to KafkaChannel class, but
> not needed in Node (but one may need to add this parameter to Selector#
> connect). Could you point me to which usage of Node needs to access the
> priority?
>
>
> Guozhang
>
>
> On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison 
> wrote:
>
>> Thanks Jason for the feedback! Yes it makes sense to always use the
>> MemoryPool is we can. I've updated the KIP with the suggestion
>>
>> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson 
>> wrote:
>> > Just a minor comment. The KIP suggests that coordinator responses are
>> > always allocated outside of the memory pool, but maybe we can reserve
>> that
>> > capability for only when the pool does not have enough space? It seems a
>> > little nicer to use the pool if we can. If that seems reasonable, I'm +1
>> on
>> > the KIP. Thanks for the effort!
>> >
>> > -Jason
>> >
>> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
>> mickael.mai...@gmail.com>
>> > wrote:
>> >
>> >> Yes I agree, having a generic flag is more future proof.
>> >> I'll update the KIP in the coming days.
>> >>
>> >> Thanks
>> >>
>> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson 
>> >> wrote:
>> >> > Hey Mickael,
>> >> >
>> >> > The suggestion to add something to Node makes sense. I could imagine
>> for
>> >> > example adding a flag to indicate that the connection has a higher
>> >> > "priority," meaning that we can allocate outside of the memory pool if
>> >> > necessary. That would still be generic even if the only use case is
>> the
>> >> > consumer coordinator. We might also face a similar problem when the
>> >> > producer is sending requests to the transaction coordinator for
>> KIP-98.
>> >> > What do you think?
>> >> >
>> >> > Thanks,
>> >> > Jason
>> >> >
>> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
>> >> mickael.mai...@gmail.com>
>> >> > wrote:
>> >> >
>> >> >> Apologies for the late response.
>> >> >>
>> >> >> Thanks Jason for the suggestion. Yes you are right, the Coordinator
>> >> >> connection is "tagged" with a different id, so we could retrieve it
>> in
>> >> >> NetworkReceive to make the distinction.
>> >> >> However, currently the coordinator connection are made different by
>> >> using:
>> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
>> >> >> for the Node id.
>> >> >>
>> >> >> So to identify Coordinator connections, we'd have to check that the
>> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
>> bit
>> >> >> hacky ...
>> >> >>
>> >> >> Maybe we could add a constructor to Node that allows to pass in a
>> >> >> sourceId String. That way we could make all the coordinator
>> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
>> >> >> example).
>> >> >> What do you think ?
>> >> >>
>> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
>> ja...@confluent.io>
>> >> >> wrote:
>> >> >> > Good point. The consumer does use a separate connection to the
>> >> >> coordinator,
>> >> >> > so perhaps the connection itself could be tagged for normal heap
>> >> >> allocation?
>> >> >> >
>> >> >> > -Jason
>> >> >> >
>> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
>> >> >> onurkaraman.apa...@gmail.com
>> >> >> >> wrote:
>> >> >> >
>> >> >> >> I only did a quick scan but I wanted to point out what I think is
>> an
>> >> >> >> incorrect assumption in the KIP's caveats:
>> >> >> >> "
>> >> >> >> There is a risk using the MemoryPool that, after we fill up the
>> >> memory
>> >> >> with
>> >> >> >> fetch data, we can starve the coordinator's connection
>> >> >> >> ...
>> >> >> >> To alleviate this issue, only messages larger than 

[jira] [Created] (KAFKA-4968) The NetworkClient should rate-limit its log messages about protocol downgrades

2017-03-28 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-4968:
--

 Summary: The NetworkClient should rate-limit its log messages 
about protocol downgrades
 Key: KAFKA-4968
 URL: https://issues.apache.org/jira/browse/KAFKA-4968
 Project: Kafka
  Issue Type: Bug
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe
Priority: Minor


The NetworkClient should rate-limit its log messages about protocol downgrades.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4755) SimpleBenchmark consume test fails for streams

2017-03-28 Thread Roger Hoover (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roger Hoover updated KAFKA-4755:

Priority: Blocker  (was: Major)

> SimpleBenchmark consume test fails for streams
> --
>
> Key: KAFKA-4755
> URL: https://issues.apache.org/jira/browse/KAFKA-4755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> This occurred Feb 10th 2017:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=consume.scale=1
> status: FAIL
> run time:   7 minutes 36.712 seconds
> Streams Test process on ubuntu@worker2 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker2 took too long to exit



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2754: KAFKA-4968. The NetworkClient should rate-limit it...

2017-03-28 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2754

KAFKA-4968. The NetworkClient should rate-limit its log messages abou…

…t protocol downgrades

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4968

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2754


commit 9b5906ba9e240ba74b75017cc5724e180a69e0c4
Author: Colin P. Mccabe 
Date:   2017-03-28T17:28:56Z

KAFKA-4968. The NetworkClient should rate-limit its log messages about 
protocol downgrades




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4968) The NetworkClient should rate-limit its log messages about protocol downgrades

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945616#comment-15945616
 ] 

ASF GitHub Bot commented on KAFKA-4968:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2754

KAFKA-4968. The NetworkClient should rate-limit its log messages abou…

…t protocol downgrades

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4968

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2754


commit 9b5906ba9e240ba74b75017cc5724e180a69e0c4
Author: Colin P. Mccabe 
Date:   2017-03-28T17:28:56Z

KAFKA-4968. The NetworkClient should rate-limit its log messages about 
protocol downgrades




> The NetworkClient should rate-limit its log messages about protocol downgrades
> --
>
> Key: KAFKA-4968
> URL: https://issues.apache.org/jira/browse/KAFKA-4968
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
>
> The NetworkClient should rate-limit its log messages about protocol 
> downgrades.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4755) SimpleBenchmark consume test fails for streams

2017-03-28 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945604#comment-15945604
 ] 

Eno Thereska commented on KAFKA-4755:
-

So the second test is actually a different failure. The first failure was on 
trunk, this second one is on 0.10.2 branch. It's ok to keep them here for now.

> SimpleBenchmark consume test fails for streams
> --
>
> Key: KAFKA-4755
> URL: https://issues.apache.org/jira/browse/KAFKA-4755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.11.0.0
>
>
> This occurred Feb 10th 2017:
> kafkatest.benchmarks.streams.streams_simple_benchmark_test.StreamsSimpleBenchmarkTest.test_simple_benchmark.test=consume.scale=1
> status: FAIL
> run time:   7 minutes 36.712 seconds
> Streams Test process on ubuntu@worker2 took too long to exit
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py",
>  line 86, in test_simple_benchmark
> self.driver[num].wait()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 102, in wait
> self.wait_node(node, timeout_sec)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
>  line 106, in wait_node
> wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, 
> err_msg="Streams Test process on " + str(node.account) + " took too long to 
> exit")
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
>  line 36, in wait_until
> raise TimeoutError(err_msg)
> TimeoutError: Streams Test process on ubuntu@worker2 took too long to exit



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-03-28 Thread Guozhang Wang
1) Makes sense.
2) Makes sense. Thanks!

On Tue, Mar 28, 2017 at 10:11 AM, Mickael Maison 
wrote:

> Hi Guozhang,
>
> Thanks for the feedback.
>
> 1) By MemoryPool, I mean the implementation added in KIP-72. That will
> most likely be SimpleMemoryPool, but the PR for KIP-72 has not been
> merged yet.
> I've updated the KIP to make it more obvious.
>
> 2) I was thinking to pass in the priority when creating the
> Coordinator Node (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/
> AbstractCoordinator.java#L582)
> Then when calling Selector.connect() (in
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/NetworkClient.java#L643)
> retrieve it and pass it in the Selector so it uses it when building
> the Channel.
> The idea was to avoid having to deduce the connection is for the
> Coordinator from the ID but instead have it explicitly set by
> AbstractCoordinator (and pass it all the way down to the Channel)
>
> On Tue, Mar 28, 2017 at 1:33 AM, Guozhang Wang  wrote:
> > Mickael,
> >
> > Sorry for the late review of the KIP. I'm +1 on the proposed change as
> > well. Just a few minor comments on the wiki itself:
> >
> > 1. By the "MemoryPool" are you referring to a new class impl or to
> reusing "
> > org.apache.kafka.clients.producer.internals.BufferPool"? I assume it was
> > the latter case, and if yes, could you update the wiki page to make it
> > clear?
> >
> > 2. I think it is sufficient to add the priority to KafkaChannel class,
> but
> > not needed in Node (but one may need to add this parameter to Selector#
> > connect). Could you point me to which usage of Node needs to access the
> > priority?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Mar 10, 2017 at 9:52 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> >> Thanks Jason for the feedback! Yes it makes sense to always use the
> >> MemoryPool is we can. I've updated the KIP with the suggestion
> >>
> >> On Fri, Mar 10, 2017 at 1:18 AM, Jason Gustafson 
> >> wrote:
> >> > Just a minor comment. The KIP suggests that coordinator responses are
> >> > always allocated outside of the memory pool, but maybe we can reserve
> >> that
> >> > capability for only when the pool does not have enough space? It
> seems a
> >> > little nicer to use the pool if we can. If that seems reasonable, I'm
> +1
> >> on
> >> > the KIP. Thanks for the effort!
> >> >
> >> > -Jason
> >> >
> >> > On Tue, Feb 28, 2017 at 10:09 AM, Mickael Maison <
> >> mickael.mai...@gmail.com>
> >> > wrote:
> >> >
> >> >> Yes I agree, having a generic flag is more future proof.
> >> >> I'll update the KIP in the coming days.
> >> >>
> >> >> Thanks
> >> >>
> >> >> On Tue, Feb 28, 2017 at 5:08 AM, Jason Gustafson  >
> >> >> wrote:
> >> >> > Hey Mickael,
> >> >> >
> >> >> > The suggestion to add something to Node makes sense. I could
> imagine
> >> for
> >> >> > example adding a flag to indicate that the connection has a higher
> >> >> > "priority," meaning that we can allocate outside of the memory
> pool if
> >> >> > necessary. That would still be generic even if the only use case is
> >> the
> >> >> > consumer coordinator. We might also face a similar problem when the
> >> >> > producer is sending requests to the transaction coordinator for
> >> KIP-98.
> >> >> > What do you think?
> >> >> >
> >> >> > Thanks,
> >> >> > Jason
> >> >> >
> >> >> > On Mon, Feb 27, 2017 at 9:09 AM, Mickael Maison <
> >> >> mickael.mai...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> >> Apologies for the late response.
> >> >> >>
> >> >> >> Thanks Jason for the suggestion. Yes you are right, the
> Coordinator
> >> >> >> connection is "tagged" with a different id, so we could retrieve
> it
> >> in
> >> >> >> NetworkReceive to make the distinction.
> >> >> >> However, currently the coordinator connection are made different
> by
> >> >> using:
> >> >> >> Integer.MAX_VALUE - groupCoordinatorResponse.node().id()
> >> >> >> for the Node id.
> >> >> >>
> >> >> >> So to identify Coordinator connections, we'd have to check that
> the
> >> >> >> NetworkReceive source is a value near Integer.MAX_VALUE which is a
> >> bit
> >> >> >> hacky ...
> >> >> >>
> >> >> >> Maybe we could add a constructor to Node that allows to pass in a
> >> >> >> sourceId String. That way we could make all the coordinator
> >> >> >> connections explicit (by setting it to "Coordinator-[ID]" for
> >> >> >> example).
> >> >> >> What do you think ?
> >> >> >>
> >> >> >> On Tue, Jan 24, 2017 at 12:58 AM, Jason Gustafson <
> >> ja...@confluent.io>
> >> >> >> wrote:
> >> >> >> > Good point. The consumer does use a separate connection to the
> >> >> >> coordinator,
> >> >> >> > so perhaps the connection itself could be tagged for normal heap
> >> >> >> allocation?
> >> >> >> >
> >> >> >> > -Jason
> >> >> >> >
> >> >> >> > On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman <
> >> >> >> onurkaraman.apa...@gmail.com
> >> >> >> >> wrote:
> >> >> >>

Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Dong Lin
I think it makes sense to reset the timer when group changes. This allows
broker to possibly wait longer if more consumers are restarted in parallel.
For example, user may start a large number of consumers in sequence using
script. The time for all consumers to send JoinGroupRequest will likely be
proportional to the number of consumers in the group.

I also think it makes sense to only add broker side config. I assume the
use-case for this KIP is to reduce unnecessary rebalance if all consumers
are started in parallel with relatively short difference in start time
between consecutive starts. It means that the gap in time between
consecutive JoinGroupRequest should be small. While it is possible that
some consumers may take much longer time to initialize than other consumer
so that their JoinGroupRequest arrives much later, it is hard to optimize
for this case without knowing the variance of this initialization time.

Should we default the value of this delay to 0? This is a safe and backward
compatible choice. Not sure if client application is sensitive to the time
from when consumer starts to when the consumer receives the first message.

Thanks,
Dong


On Tue, Mar 28, 2017 at 8:29 AM, Damian Guy  wrote:

> @Ismael - yeah sure we can reduce the default, though i'm not sure what the
> "right" default would be.
>
> On Tue, 28 Mar 2017 at 15:40 Ismael Juma  wrote:
>
> > Is 3 seconds the right default if the timer gets reset after each
> consumer
> > joins? Maybe we can lower the default value given the new approach.
> >
> > Ismael
> >
> > On Tue, Mar 28, 2017 at 9:53 AM, Damian Guy 
> wrote:
> >
> > > All,
> > > I'd like to get this back to the original discussion about Delaying
> > initial
> > > consumer group rebalance.
> > > I think i'm leaning towards sticking with the broker config and
> changing
> > > the delay so that the timer starts again when a new consumer joins the
> > > group. What are peoples thoughts on that?
> > >
> > > Doing something similar on leave is valid, but i'd prefer to consider
> it
> > > separately from this.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:
> > >
> > > > Matthias,
> > > >
> > > > Yes i know.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
> > > > wrote:
> > > >
> > > > Damian,
> > > >
> > > > about "rebalance immediately" on timeout -- I guess, that's a
> different
> > > > case as no LeaveGroupRequest will be sent. Thus, the broker should be
> > > > able to distinguish both cases easily, and apply the delay only if it
> > > > received the LeaveGroupRequest but not if a consumer times out.
> > > >
> > > > Does this make sense?
> > > >
> > > > -Matthias
> > > >
> > > > On 3/27/17 1:56 AM, Damian Guy wrote:
> > > > > @Becket
> > > > >
> > > > > Thanks for the feedback. Yes, i like the idea of extending the
> delay
> > as
> > > > > each new consumer joins the group. Though, i think this could be
> done
> > > > with
> > > > > either a consumer or broker side config. But i get your point that
> > some
> > > > > consumers in the group can be misconfigured.
> > > > >
> > > > > @Matthias & @Eno - yes we could probably do something similar if
> the
> > > > member
> > > > > has sent the LeaveGroupRequest. I'm not sure it would be valid if
> the
> > > > > member crashed, hence session.timeout would come into play, we'd
> > > probably
> > > > > want to rebalance immediately. I'd be interested in hearing
> thoughts
> > > from
> > > > > other core kafka folks on this one.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > >
> > > > >
> > > > > On Fri, 24 Mar 2017 at 23:01 Becket Qin 
> > wrote:
> > > > >
> > > > >> Hi Matthias,
> > > > >>
> > > > >> Yes, that was what I was thinking. We will keep delay it until
> > either
> > > > >> reaching the rebalance timeout or no new consumer joins in that
> > small
> > > > delay
> > > > >> which is configured on the broker side.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jiangjie (Becket) Qin
> > > > >>
> > > > >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax <
> > > matth...@confluent.io
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> @Becket:
> > > > >>>
> > > > >>> I am not sure, if I understand this correctly. Instead of
> applying
> > a
> > > > >>> fixed delay, that starts when the first consumer of an (empty)
> > group
> > > > >>> joins, you suggest to re-trigger/re-set the delay each time a new
> > > > >>> consumer joins?
> > > > >>>
> > > > >>> This sound like a good strategy to me, if the config is on the
> > broker
> > > > >> side.
> > > > >>>
> > > > >>> @Eno:
> > > > >>>
> > > > >>> I think that's a valid point and I like this idea!
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>>
> > > > >>> On 3/24/17 1:23 PM, Eno Thereska wrote:
> > > >  Thanks Damian,
> > > > 
> > > >  This KIP deals with the initial phase only. What about the cases
> > > when
> > > > >>> several cons

Build failed in Jenkins: kafka-trunk-jdk8 #1381

2017-03-28 Thread Apache Jenkins Server
See 


Changes:

[becket.qin] KAFKA-4586; Add purgeDataBefore() API (KIP-107)

--
[...truncated 162.30 KB...]

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenRacks 
STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenRacks 
PASSED

kafka.admin.AdminRackAwareTest > testAssignmentWith2ReplicasRackAware STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWith2ReplicasRackAware PASSED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenReplicas 
STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenReplicas 
PASSED

kafka.admin.AdminRackAwareTest > testSkipBrokerWithReplicaAlreadyAssigned 
STARTED

kafka.admin.AdminRackAwareTest > testSkipBrokerWithReplicaAlreadyAssigned PASSED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAware STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAware PASSED

kafka.admin.AdminRackAwareTest > testRackAwareExpansion STARTED

kafka.admin.AdminRackAwareTest > testRackAwareExpansion PASSED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6Partitions STARTED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6Partitions PASSED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers STARTED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers PASSED

kafka.admin.AdminRackAwareTest > 
testGetRackAlternatedBrokerListAndAssignReplicasToBrokers STARTED

kafka.admin.AdminRackAwareTest > 
testGetRackAlternatedBrokerListAndAssignReplicasToBrokers PASSED

kafka.admin.AdminRackAwareTest > testMoreReplicasThanRacks STARTED

kafka.admin.AdminRackAwareTest > testMoreReplicasThanRacks PASSED

kafka.admin.AdminRackAwareTest > testSingleRack STARTED

kafka.admin.AdminRackAwareTest > testSingleRack PASSED

kafka.admin.AdminRackAwareTest > 
testAssignmentWithRackAwareWithRandomStartIndex STARTED

kafka.admin.AdminRackAwareTest > 
testAssignmentWithRackAwareWithRandomStartIndex PASSED

kafka.admin.AdminRackAwareTest > testLargeNumberPartitionsAssignment STARTED

kafka.admin.AdminRackAwareTest > testLargeNumberPartitionsAssignment PASSED

kafka.admin.AdminRackAwareTest > testLessReplicasThanRacks STARTED

kafka.admin.AdminRackAwareTest > testLessReplicasThanRacks PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics 
STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics 
PASSED

kafka.admin.DeleteConsumerGroupTest > 
testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK STARTED

kafka.admin.DeleteConsumerGroupTest > 
testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK PASSED

kafka.admin.DeleteConsumerGroupTest > testTopicWideDeleteInZK STARTED

kafka.admin.DeleteConsumerGroupTest > testTopicWideDeleteInZK PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingOneTopic STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingOneTopic PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics PASSED

kafka.admin.DeleteConsumerGroupTest > testGroupWideDeleteInZK STARTED

kafka.admin.DeleteConsumerGroupTest > testGroupWideDeleteInZK PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacementAllServers STARTED

kafka.admin.AddPartitionsTest > testReplicaPlacementAllServers PASSED

kafka.admin.AddPartitionsTest > testWrongReplicaCount STARTED

kafka.admin.AddPartitionsTest > testWrongReplicaCount PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacementPartialServers STARTED

kafka.admin.AddPartitionsTest > testReplicaPlacementPartialServers PASSED

kafka.admin.AddPartitionsTest > testTopicDoesNotExist STARTED

kafka.admin.AddPartitionsTest > testTopicDoesNotExist PASSED

kafka.admin.AddPartitionsTest > testIncrementPartitions STARTED

kafka.admin.AddPartitionsTest > testIncrementPartitions PASSED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas STARTED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas PASSED

kafka.admin.ConfigCommandTest > testScramCredentials STARTED

kafka.admin.ConfigCommandTest > testScramCredentials PASSED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForTopicsEntityType STARTED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForTopicsEntityType PASSED

kafka.admin.ConfigCommandTest > testUse

Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Guozhang Wang
Just clarifying on "session.timeout.ms": today we already have a
rebalance.timeout value in the JoinGroupRequest protocol, which is used to
determine how long the coordinator will wait for each consumer to re-join
the group during prepare-rebalance phase; and I was thinking we can use
that value for the initial delay, so that even if later we decided to make
this rebalance timeout value configurable on the client-side (today it is
hard written as max poll timeout on the Java client code, default 5min, not
the session timeout value, default 10sec: I was wrong before) we can do
that without bumping up the protocol.

Now thinking about it a bit more, I feel the main problem in around the
default value settings since it is 5min which is definitely not acceptable.

On the other hand, I feel that keeping it as a broker-side config may be
sufficient since this config is only to be used for the first time ever
(i.e. not for re-joining, which is controlled by the rebalance timeout
value), so it only depends on how quickly a consumer app would be starting
all of its instances at the same time, which I think would be most likely
universal instead of one-spec-per-app.

I like the idea of resetting the timer when there are new consumer joining
for the initial delay. So I'd suggest we just keep it as a broker side
config with resetting timer on new consumer joining, and do NOT extend it
for leave group as well for the similar motivations above: their expected
time values would be different (3-5 seconds for initial delay, while tens
of seconds for the leave group delay expecting other instances will also
leave or the leaving instance will re-join soon).


Guozhang

On Tue, Mar 28, 2017 at 8:29 AM, Damian Guy  wrote:

> @Ismael - yeah sure we can reduce the default, though i'm not sure what the
> "right" default would be.
>
> On Tue, 28 Mar 2017 at 15:40 Ismael Juma  wrote:
>
> > Is 3 seconds the right default if the timer gets reset after each
> consumer
> > joins? Maybe we can lower the default value given the new approach.
> >
> > Ismael
> >
> > On Tue, Mar 28, 2017 at 9:53 AM, Damian Guy 
> wrote:
> >
> > > All,
> > > I'd like to get this back to the original discussion about Delaying
> > initial
> > > consumer group rebalance.
> > > I think i'm leaning towards sticking with the broker config and
> changing
> > > the delay so that the timer starts again when a new consumer joins the
> > > group. What are peoples thoughts on that?
> > >
> > > Doing something similar on leave is valid, but i'd prefer to consider
> it
> > > separately from this.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:
> > >
> > > > Matthias,
> > > >
> > > > Yes i know.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
> > > > wrote:
> > > >
> > > > Damian,
> > > >
> > > > about "rebalance immediately" on timeout -- I guess, that's a
> different
> > > > case as no LeaveGroupRequest will be sent. Thus, the broker should be
> > > > able to distinguish both cases easily, and apply the delay only if it
> > > > received the LeaveGroupRequest but not if a consumer times out.
> > > >
> > > > Does this make sense?
> > > >
> > > > -Matthias
> > > >
> > > > On 3/27/17 1:56 AM, Damian Guy wrote:
> > > > > @Becket
> > > > >
> > > > > Thanks for the feedback. Yes, i like the idea of extending the
> delay
> > as
> > > > > each new consumer joins the group. Though, i think this could be
> done
> > > > with
> > > > > either a consumer or broker side config. But i get your point that
> > some
> > > > > consumers in the group can be misconfigured.
> > > > >
> > > > > @Matthias & @Eno - yes we could probably do something similar if
> the
> > > > member
> > > > > has sent the LeaveGroupRequest. I'm not sure it would be valid if
> the
> > > > > member crashed, hence session.timeout would come into play, we'd
> > > probably
> > > > > want to rebalance immediately. I'd be interested in hearing
> thoughts
> > > from
> > > > > other core kafka folks on this one.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > >
> > > > >
> > > > > On Fri, 24 Mar 2017 at 23:01 Becket Qin 
> > wrote:
> > > > >
> > > > >> Hi Matthias,
> > > > >>
> > > > >> Yes, that was what I was thinking. We will keep delay it until
> > either
> > > > >> reaching the rebalance timeout or no new consumer joins in that
> > small
> > > > delay
> > > > >> which is configured on the broker side.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jiangjie (Becket) Qin
> > > > >>
> > > > >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax <
> > > matth...@confluent.io
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> @Becket:
> > > > >>>
> > > > >>> I am not sure, if I understand this correctly. Instead of
> applying
> > a
> > > > >>> fixed delay, that starts when the first consumer of an (empty)
> > group
> > > > >>> joins, you suggest to re-trigger/re-set the delay each time a new
> > > > >>> consume

Build failed in Jenkins: kafka-trunk-jdk7 #2045

2017-03-28 Thread Apache Jenkins Server
See 


Changes:

[becket.qin] KAFKA-4586; Add purgeDataBefore() API (KIP-107)

--
[...truncated 330.82 KB...]

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenRacks 
STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenRacks 
PASSED

kafka.admin.AdminRackAwareTest > testAssignmentWith2ReplicasRackAware STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWith2ReplicasRackAware PASSED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenReplicas 
STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAwareWithUnevenReplicas 
PASSED

kafka.admin.AdminRackAwareTest > testSkipBrokerWithReplicaAlreadyAssigned 
STARTED

kafka.admin.AdminRackAwareTest > testSkipBrokerWithReplicaAlreadyAssigned PASSED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAware STARTED

kafka.admin.AdminRackAwareTest > testAssignmentWithRackAware PASSED

kafka.admin.AdminRackAwareTest > testRackAwareExpansion STARTED

kafka.admin.AdminRackAwareTest > testRackAwareExpansion PASSED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6Partitions STARTED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6Partitions PASSED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers STARTED

kafka.admin.AdminRackAwareTest > 
testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers PASSED

kafka.admin.AdminRackAwareTest > 
testGetRackAlternatedBrokerListAndAssignReplicasToBrokers STARTED

kafka.admin.AdminRackAwareTest > 
testGetRackAlternatedBrokerListAndAssignReplicasToBrokers PASSED

kafka.admin.AdminRackAwareTest > testMoreReplicasThanRacks STARTED

kafka.admin.AdminRackAwareTest > testMoreReplicasThanRacks PASSED

kafka.admin.AdminRackAwareTest > testSingleRack STARTED

kafka.admin.AdminRackAwareTest > testSingleRack PASSED

kafka.admin.AdminRackAwareTest > 
testAssignmentWithRackAwareWithRandomStartIndex STARTED

kafka.admin.AdminRackAwareTest > 
testAssignmentWithRackAwareWithRandomStartIndex PASSED

kafka.admin.AdminRackAwareTest > testLargeNumberPartitionsAssignment STARTED

kafka.admin.AdminRackAwareTest > testLargeNumberPartitionsAssignment PASSED

kafka.admin.AdminRackAwareTest > testLessReplicasThanRacks STARTED

kafka.admin.AdminRackAwareTest > testLessReplicasThanRacks PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupWideDeleteInZKDoesNothingForActiveConsumerGroup PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics 
STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKDoesNothingForActiveGroupConsumingMultipleTopics 
PASSED

kafka.admin.DeleteConsumerGroupTest > 
testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK STARTED

kafka.admin.DeleteConsumerGroupTest > 
testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK PASSED

kafka.admin.DeleteConsumerGroupTest > testTopicWideDeleteInZK STARTED

kafka.admin.DeleteConsumerGroupTest > testTopicWideDeleteInZK PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingOneTopic STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingOneTopic PASSED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics STARTED

kafka.admin.DeleteConsumerGroupTest > 
testGroupTopicWideDeleteInZKForGroupConsumingMultipleTopics PASSED

kafka.admin.DeleteConsumerGroupTest > testGroupWideDeleteInZK STARTED

kafka.admin.DeleteConsumerGroupTest > testGroupWideDeleteInZK PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacementAllServers STARTED

kafka.admin.AddPartitionsTest > testReplicaPlacementAllServers PASSED

kafka.admin.AddPartitionsTest > testWrongReplicaCount STARTED

kafka.admin.AddPartitionsTest > testWrongReplicaCount PASSED

kafka.admin.AddPartitionsTest > testReplicaPlacementPartialServers STARTED

kafka.admin.AddPartitionsTest > testReplicaPlacementPartialServers PASSED

kafka.admin.AddPartitionsTest > testTopicDoesNotExist STARTED

kafka.admin.AddPartitionsTest > testTopicDoesNotExist PASSED

kafka.admin.AddPartitionsTest > testIncrementPartitions STARTED

kafka.admin.AddPartitionsTest > testIncrementPartitions PASSED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas STARTED

kafka.admin.AddPartitionsTest > testManualAssignmentOfReplicas PASSED

kafka.admin.ConfigCommandTest > testScramCredentials STARTED

kafka.admin.ConfigCommandTest > testScramCredentials PASSED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForTopicsEntityType STARTED

kafka.admin.ConfigCommandTest > shouldParseArgumentsForTopicsEntityType PASSED

kafka.admin.ConfigCommandTest > testUse

Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Jason Gustafson
Hey Damian,

Thanks for the KIP. I think the proposal makes sense as a workaround maybe
for some advanced users. However, I'm not sure we can depend on average
users knowing that the config exists, much less setting it to something
that makes sense. It's kind of a trend in streams that I'm not too thrilled
about to try and control these rebalances through careful tuning of various
timeouts. For example, the patch to avoid sending LeaveGroup depends on the
session timeout being set at least as long as the time for an average
rolling restart. If the expectation is that these settings are only needed
for advanced users, it may be sufficient, but if the problems are affecting
average users, it seems less than ideal. That said, if we can get some real
benefit from low-hanging fruit like this, then it's probably worthwhile.

This relates to the choice of default value, by the way. If we use 0 as the
default, my guess is that most users won't change it and the benefit could
be marginal. The choice of 3 seconds that you've documented seems fine to
me. It matches the default consumer heartbeat interval, which controls
typical rebalance latency, so there's some consistency there.

Also, one minor comment: I guess the actual delay for each group will be
the minimum of the group's rebalance timeout and
group.initial.rebalance.delay.ms. Is that right?

-Jason

On Tue, Mar 28, 2017 at 8:29 AM, Damian Guy  wrote:

> @Ismael - yeah sure we can reduce the default, though i'm not sure what the
> "right" default would be.
>
> On Tue, 28 Mar 2017 at 15:40 Ismael Juma  wrote:
>
> > Is 3 seconds the right default if the timer gets reset after each
> consumer
> > joins? Maybe we can lower the default value given the new approach.
> >
> > Ismael
> >
> > On Tue, Mar 28, 2017 at 9:53 AM, Damian Guy 
> wrote:
> >
> > > All,
> > > I'd like to get this back to the original discussion about Delaying
> > initial
> > > consumer group rebalance.
> > > I think i'm leaning towards sticking with the broker config and
> changing
> > > the delay so that the timer starts again when a new consumer joins the
> > > group. What are peoples thoughts on that?
> > >
> > > Doing something similar on leave is valid, but i'd prefer to consider
> it
> > > separately from this.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 28 Mar 2017 at 09:48 Damian Guy  wrote:
> > >
> > > > Matthias,
> > > >
> > > > Yes i know.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax 
> > > > wrote:
> > > >
> > > > Damian,
> > > >
> > > > about "rebalance immediately" on timeout -- I guess, that's a
> different
> > > > case as no LeaveGroupRequest will be sent. Thus, the broker should be
> > > > able to distinguish both cases easily, and apply the delay only if it
> > > > received the LeaveGroupRequest but not if a consumer times out.
> > > >
> > > > Does this make sense?
> > > >
> > > > -Matthias
> > > >
> > > > On 3/27/17 1:56 AM, Damian Guy wrote:
> > > > > @Becket
> > > > >
> > > > > Thanks for the feedback. Yes, i like the idea of extending the
> delay
> > as
> > > > > each new consumer joins the group. Though, i think this could be
> done
> > > > with
> > > > > either a consumer or broker side config. But i get your point that
> > some
> > > > > consumers in the group can be misconfigured.
> > > > >
> > > > > @Matthias & @Eno - yes we could probably do something similar if
> the
> > > > member
> > > > > has sent the LeaveGroupRequest. I'm not sure it would be valid if
> the
> > > > > member crashed, hence session.timeout would come into play, we'd
> > > probably
> > > > > want to rebalance immediately. I'd be interested in hearing
> thoughts
> > > from
> > > > > other core kafka folks on this one.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > >
> > > > >
> > > > > On Fri, 24 Mar 2017 at 23:01 Becket Qin 
> > wrote:
> > > > >
> > > > >> Hi Matthias,
> > > > >>
> > > > >> Yes, that was what I was thinking. We will keep delay it until
> > either
> > > > >> reaching the rebalance timeout or no new consumer joins in that
> > small
> > > > delay
> > > > >> which is configured on the broker side.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jiangjie (Becket) Qin
> > > > >>
> > > > >> On Fri, Mar 24, 2017 at 1:39 PM, Matthias J. Sax <
> > > matth...@confluent.io
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> @Becket:
> > > > >>>
> > > > >>> I am not sure, if I understand this correctly. Instead of
> applying
> > a
> > > > >>> fixed delay, that starts when the first consumer of an (empty)
> > group
> > > > >>> joins, you suggest to re-trigger/re-set the delay each time a new
> > > > >>> consumer joins?
> > > > >>>
> > > > >>> This sound like a good strategy to me, if the config is on the
> > broker
> > > > >> side.
> > > > >>>
> > > > >>> @Eno:
> > > > >>>
> > > > >>> I think that's a valid point and I like this idea!
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>>
> > > > >>

[jira] [Commented] (KAFKA-4930) Connect Rest API allows creating connectors with an empty name

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945974#comment-15945974
 ] 

ASF GitHub Bot commented on KAFKA-4930:
---

GitHub user soenkeliebau opened a pull request:

https://github.com/apache/kafka/pull/2755

KAFKA-4930: Added connector name validator …

…to check for empty connector name and illegal characters in connector 
name. This also fixes  KAFKA-4938 by removing the check for slashes in 
connector name from ConnectorsResource.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soenkeliebau/kafka KAFKA-4930

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2755


commit 348d75dce006ddeb5beba0fc5ea5d761a4a87a82
Author: Soenke Liebau 
Date:   2017-03-28T21:06:43Z

KAFKA-4930: Added connector name validator to check for empty connector 
name and illegal characters in connector name. This also fixes  KAFKA-4938 by 
removing the check for slashes in connector name from ConnectorsResource.




> Connect Rest API allows creating connectors with an empty name
> --
>
> Key: KAFKA-4930
> URL: https://issues.apache.org/jira/browse/KAFKA-4930
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Sönke Liebau
>Priority: Minor
>
> The Connect Rest API allows to deploy connectors with an empty name field, 
> which then cannot be removed through the api.
> Sending the following request:
> {code}
> {
> "name": "",
> "config": {
> "connector.class": 
> "org.apache.kafka.connect.tools.MockSourceConnector",
> "tasks.max": "1",
> "topics": "test-topic"
> 
> }
> }
> {code}
> Results in a connector being deployed which can be seen in the list of 
> connectors:
> {code}
> [
>   "",
>   "testconnector"
> ]{code}
> But cannot be removed via a DELETE call, as the api thinks we are trying to 
> delete the /connectors endpoint and declines the request.
> I don't think there is a valid case for the connector name to be empty so 
> perhaps we should add a check for this. I am happy to work on this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2755: KAFKA-4930: Added connector name validator …

2017-03-28 Thread soenkeliebau
GitHub user soenkeliebau opened a pull request:

https://github.com/apache/kafka/pull/2755

KAFKA-4930: Added connector name validator …

…to check for empty connector name and illegal characters in connector 
name. This also fixes  KAFKA-4938 by removing the check for slashes in 
connector name from ConnectorsResource.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soenkeliebau/kafka KAFKA-4930

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2755


commit 348d75dce006ddeb5beba0fc5ea5d761a4a87a82
Author: Soenke Liebau 
Date:   2017-03-28T21:06:43Z

KAFKA-4930: Added connector name validator to check for empty connector 
name and illegal characters in connector name. This also fixes  KAFKA-4938 by 
removing the check for slashes in connector name from ConnectorsResource.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2017-03-28 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-4969:
--

 Summary: State-store workload-aware StreamsPartitionAssignor
 Key: KAFKA-4969
 URL: https://issues.apache.org/jira/browse/KAFKA-4969
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Currently, {{StreamPartitionsAssigner}} does not distinguish different "types" 
of tasks. For example, task can be stateless of have one or multiple stores.

This can lead to an suboptimal task placement: assume there are 2 stateless and 
2 stateful tasks and the app is running with 2 instances. To share the "store 
load" it would be good to place one stateless and one stateful task per 
instance. Right now, there is no guarantee about this, and it can happen, that 
one instance processed both stateless tasks while the other processes both 
stateful tasks.

We should improve {{StreamPartitionAssignor}} and introduce "task types" 
including a cost model for task placement. We should consider the following 
parameters:
 - number of stores
 - number of sources/sinks
 - number of processors
 - regular task vs standby task

This improvement should be backed by a design document in the project wiki (no 
KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4930) Connect Rest API allows creating connectors with an empty name

2017-03-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945987#comment-15945987
 ] 

Sönke Liebau commented on KAFKA-4930:
-

I've created a PR with my approach as it currently stands. I probably still 
need to create and additional test case or two, for now I just fixed one broken 
test (moved the check for / in connector name to a different part of the code 
that was being mocked in that test).

If we go with this route we should probably discuss what characters we want to 
prohibit in connector names, I did a bit of googling in the hope of finding a 
rfc that states "these 5 characters are illegal in a rest url" - but that seems 
to be a somewhat more [complicated 
topic|http://stackoverflow.com/questions/2366260/whats-valid-and-whats-not-in-a-uri-query].
 Also there is of course the risk of prohibiting something that someone out 
there already used, so we should be very careful here I think.


> Connect Rest API allows creating connectors with an empty name
> --
>
> Key: KAFKA-4930
> URL: https://issues.apache.org/jira/browse/KAFKA-4930
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Sönke Liebau
>Priority: Minor
>
> The Connect Rest API allows to deploy connectors with an empty name field, 
> which then cannot be removed through the api.
> Sending the following request:
> {code}
> {
> "name": "",
> "config": {
> "connector.class": 
> "org.apache.kafka.connect.tools.MockSourceConnector",
> "tasks.max": "1",
> "topics": "test-topic"
> 
> }
> }
> {code}
> Results in a connector being deployed which can be seen in the list of 
> connectors:
> {code}
> [
>   "",
>   "testconnector"
> ]{code}
> But cannot be removed via a DELETE call, as the api thinks we are trying to 
> delete the /connectors endpoint and declines the request.
> I don't think there is a valid case for the connector name to be empty so 
> perhaps we should add a check for this. I am happy to work on this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4930) Connect Rest API allows creating connectors with an empty name

2017-03-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945987#comment-15945987
 ] 

Sönke Liebau edited comment on KAFKA-4930 at 3/28/17 9:22 PM:
--

I've created a PR with my approach as it currently stands. I probably still 
need to create and additional test case or two, for now I just fixed one broken 
test (moved the check for / in connector name to a different part of the code 
that was being mocked in that test). And added a simple test for no name given 
at all.

If we go with this route we should probably discuss what characters we want to 
prohibit in connector names, I did a bit of googling in the hope of finding a 
rfc that states "these 5 characters are illegal in a rest url" - but that seems 
to be a somewhat more [complicated 
topic|http://stackoverflow.com/questions/2366260/whats-valid-and-whats-not-in-a-uri-query].
 Also there is of course the risk of prohibiting something that someone out 
there already used, so we should be very careful here I think.



was (Author: sliebau):
I've created a PR with my approach as it currently stands. I probably still 
need to create and additional test case or two, for now I just fixed one broken 
test (moved the check for / in connector name to a different part of the code 
that was being mocked in that test).

If we go with this route we should probably discuss what characters we want to 
prohibit in connector names, I did a bit of googling in the hope of finding a 
rfc that states "these 5 characters are illegal in a rest url" - but that seems 
to be a somewhat more [complicated 
topic|http://stackoverflow.com/questions/2366260/whats-valid-and-whats-not-in-a-uri-query].
 Also there is of course the risk of prohibiting something that someone out 
there already used, so we should be very careful here I think.


> Connect Rest API allows creating connectors with an empty name
> --
>
> Key: KAFKA-4930
> URL: https://issues.apache.org/jira/browse/KAFKA-4930
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Sönke Liebau
>Priority: Minor
>
> The Connect Rest API allows to deploy connectors with an empty name field, 
> which then cannot be removed through the api.
> Sending the following request:
> {code}
> {
> "name": "",
> "config": {
> "connector.class": 
> "org.apache.kafka.connect.tools.MockSourceConnector",
> "tasks.max": "1",
> "topics": "test-topic"
> 
> }
> }
> {code}
> Results in a connector being deployed which can be seen in the list of 
> connectors:
> {code}
> [
>   "",
>   "testconnector"
> ]{code}
> But cannot be removed via a DELETE call, as the api thinks we are trying to 
> delete the /connectors endpoint and declines the request.
> I don't think there is a valid case for the connector name to be empty so 
> perhaps we should add a check for this. I am happy to work on this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4963) Global Store: startup recovery process skipping processor

2017-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945998#comment-15945998
 ] 

Matthias J. Sax commented on KAFKA-4963:


Your observation is correct and the behavior is by design. GlobalStores are not 
designed to put modified data but only to be populated with "raw" data from the 
topic it's reading from -- this topic is also source/changelog topic at once.

Thus, you would need to have a two step process and "duplicate" your data: 
first, read all data, apply your transformation, and write the modified data 
back into a topic. Use this topic for populate your store.

\cc [~damianguy] -- I think, we can close this as "not a problem" ? Please 
correct my if I said anything wrong.

> Global Store: startup recovery process skipping processor
> -
>
> Key: KAFKA-4963
> URL: https://issues.apache.org/jira/browse/KAFKA-4963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Yennick Trevels
>
> This issue is related to the recovery process of a global store. It might be 
> that I'm misunderstanding the design of the global store as it's all quite 
> new to me, but I wanted to verify this case.
> I'm trying to create a global store with a processor which transforms the 
> values from the source and puts them into the state store, and I want all 
> these transformed values to be available in every streams job (therefore the 
> use of a global store)
> I'll give you an example which I created based on an existing Kafka Streams 
> unit test:
> {code}
> final StateStoreSupplier storeSupplier = Stores.create("my-store")
> 
> .withStringKeys().withIntegerValues().inMemory().disableLogging().build();
> final String global = "global";
> final String topic = "topic";
> final KeyValueStore globalStore = (KeyValueStore String>) storeSupplier.get();
> final TopologyBuilder topologyBuilder = this.builder
> .addGlobalStore(globalStore, global, STRING_DESERIALIZER, 
> STRING_DESERIALIZER, topic, "processor", define(new 
> ValueToLengthStatefulProcessor("my-store")));
> driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
> driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
> driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
> assertEquals("value1".length(), globalStore.get("key1"));
> assertEquals("value2".length(), globalStore.get("key2"));
> {code}
> The ValueToLengthStatefulProcessor basically takes the incoming value, 
> calculates the length of the string, and puts the result in the state store. 
> Note the difference in types between the source stream (string values) and 
> the data store (integer values)
> If I understand global stores correctly and based on what I've tried out 
> already, the stream of data runs like this:
> a source stream named "global" reading values from a Kafka topic called 
> "topic"  -> ValueToLengthStatefulProcessor -> "my-store" state store
> However, when the streams job starts up it runs the recovery process by 
> reading out the source stream again. I've noticed that in this case it seems 
> to skip the processor entirely and acts like the source stream is the 
> changelog of the state store, making the data flow like this during the 
> recovery process:
> source stream -> "my store" state store
> Because it acts like the source stream is the changelog of the state store, 
> it also tries to use the deserializer of the state store. This won't work 
> since the values of the state store should be integers, while the values in 
> the source stream are strings.
> So all this will startup nicely as long as the source stream has no values 
> yet. However, once the source stream has (string) values, the startup 
> recovery process will fail since it will be sourcing directly to the state 
> store instead of passing the source values to the processor.
> I believe this is caused by the following line of code in 
> TopologyBuilder.addGlobalStore, which connects the store directly to the 
> source topic.
> https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507
> Please let me know if I'm totally misunderstanding how global stores should 
> work. But I think this might be a crucial bug or design flaw.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4960) Invalid state store exception

2017-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946030#comment-15946030
 ] 

Matthias J. Sax commented on KAFKA-4960:


Is it failing all the time? Or only occasionally? Do you have an 
`uncaughtExceptionHandler` registered to see if there is something else 
failing, that might be the root cause?

> Invalid state store exception
> -
>
> Key: KAFKA-4960
> URL: https://issues.apache.org/jira/browse/KAFKA-4960
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: j yeargers
> Attachments: text.html
>
>
> Attempts to run windowed aggregation returns this exception:
> 2017-03-27 20:14:28,776 [StreamThread-1] WARN   
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING 
> to NOT_RUNNING
> 2017-03-27 20:14:28,776 [StreamThread-1] WARN   
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING 
> to NOT_RUNNING
> Exception in thread "StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, topic=vi_preproc, 
> partition=1, offset=243574962
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: store 
> %s has closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
>   at 
> org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
>   at 
> org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
>   at 
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:84)
>   at 
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:35)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamWindowReduceProcessor.process(KStreamWindowReduce.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
>   ... 2 more



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-134: Delay initial consumer group rebalance

2017-03-28 Thread Becket Qin
I think separating leave/join makes sense. The scenario I can think of for
delaying a rebalance on LeaveGroupRequest is rolling bounce of a service.
But that scenario could be tricky because there may be mixture of joining
and leaving. What happens if a consumer left the group right after another
consumer joins the group? Which delay should be applied?

Jason, if I understand correctly, the actual delay of the FIRST rebalance
for each group could be anywhere between group.initial.rebalance.delay.ms and
the rebalance timeout, depending on how many times the delay is applied.
For example, if the delay is set to 3 seconds and rebalance timeout is set
to 10 seconds. At time T a consumer joins the group, the targeting
rebalance point would be T+3 if no other consumer joins. If another
consumer joins the group at T+2 then the targeting delay point would become
T+5, etc. However, no matter how many times the delay was extended, at T+10
the rebalance will kick off even if at T+9 a new consumer joined the group.

I also agree that we should set the default delay to some meaningful value
instead of setting it to 0.

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 28, 2017 at 12:32 PM, Jason Gustafson 
wrote:

> Hey Damian,
>
> Thanks for the KIP. I think the proposal makes sense as a workaround maybe
> for some advanced users. However, I'm not sure we can depend on average
> users knowing that the config exists, much less setting it to something
> that makes sense. It's kind of a trend in streams that I'm not too thrilled
> about to try and control these rebalances through careful tuning of various
> timeouts. For example, the patch to avoid sending LeaveGroup depends on the
> session timeout being set at least as long as the time for an average
> rolling restart. If the expectation is that these settings are only needed
> for advanced users, it may be sufficient, but if the problems are affecting
> average users, it seems less than ideal. That said, if we can get some real
> benefit from low-hanging fruit like this, then it's probably worthwhile.
>
> This relates to the choice of default value, by the way. If we use 0 as the
> default, my guess is that most users won't change it and the benefit could
> be marginal. The choice of 3 seconds that you've documented seems fine to
> me. It matches the default consumer heartbeat interval, which controls
> typical rebalance latency, so there's some consistency there.
>
> Also, one minor comment: I guess the actual delay for each group will be
> the minimum of the group's rebalance timeout and
> group.initial.rebalance.delay.ms. Is that right?
>
> -Jason
>
> On Tue, Mar 28, 2017 at 8:29 AM, Damian Guy  wrote:
>
> > @Ismael - yeah sure we can reduce the default, though i'm not sure what
> the
> > "right" default would be.
> >
> > On Tue, 28 Mar 2017 at 15:40 Ismael Juma  wrote:
> >
> > > Is 3 seconds the right default if the timer gets reset after each
> > consumer
> > > joins? Maybe we can lower the default value given the new approach.
> > >
> > > Ismael
> > >
> > > On Tue, Mar 28, 2017 at 9:53 AM, Damian Guy 
> > wrote:
> > >
> > > > All,
> > > > I'd like to get this back to the original discussion about Delaying
> > > initial
> > > > consumer group rebalance.
> > > > I think i'm leaning towards sticking with the broker config and
> > changing
> > > > the delay so that the timer starts again when a new consumer joins
> the
> > > > group. What are peoples thoughts on that?
> > > >
> > > > Doing something similar on leave is valid, but i'd prefer to consider
> > it
> > > > separately from this.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Tue, 28 Mar 2017 at 09:48 Damian Guy 
> wrote:
> > > >
> > > > > Matthias,
> > > > >
> > > > > Yes i know.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Mon, 27 Mar 2017 at 18:17 Matthias J. Sax <
> matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > Damian,
> > > > >
> > > > > about "rebalance immediately" on timeout -- I guess, that's a
> > different
> > > > > case as no LeaveGroupRequest will be sent. Thus, the broker should
> be
> > > > > able to distinguish both cases easily, and apply the delay only if
> it
> > > > > received the LeaveGroupRequest but not if a consumer times out.
> > > > >
> > > > > Does this make sense?
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 3/27/17 1:56 AM, Damian Guy wrote:
> > > > > > @Becket
> > > > > >
> > > > > > Thanks for the feedback. Yes, i like the idea of extending the
> > delay
> > > as
> > > > > > each new consumer joins the group. Though, i think this could be
> > done
> > > > > with
> > > > > > either a consumer or broker side config. But i get your point
> that
> > > some
> > > > > > consumers in the group can be misconfigured.
> > > > > >
> > > > > > @Matthias & @Eno - yes we could probably do something similar if
> > the
> > > > > member
> > > > > > has sent the LeaveGroupRequest. I'm not sure it would be valid if
> > the
> > > > > > member cras

[jira] [Commented] (KAFKA-4938) Creating a connector with missing name parameter throws a NullPointerException

2017-03-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946075#comment-15946075
 ] 

Sönke Liebau commented on KAFKA-4938:
-

The fix for KAFKA-4930 as it is currently proposed would also fix this issue.

> Creating a connector with missing name parameter throws a NullPointerException
> --
>
> Key: KAFKA-4938
> URL: https://issues.apache.org/jira/browse/KAFKA-4938
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Sönke Liebau
>Assignee: Balint Molnar
>Priority: Minor
>  Labels: newbie
>
> Creating a connector via the rest api runs into a NullPointerException, when 
> omitting the name parameter in the request.
> {code}
> POST 127.0.0.1:8083/connectors
> {
>   "config": {
> "connector.class": "org.apache.kafka.connect.tools.MockSourceConnector",
> "tasks.max": "1",
> "topics": "test-topic"
>   }
> }
> {code}
> Results in a 500 return code, due to a NullPointerException being thrown when 
> checking the name for slashes 
> [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L91].
>  I believe this was introduced with the fix for 
> [KAFKA-4372|https://issues.apache.org/jira/browse/KAFKA-4372]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-03-28 Thread Florian Hussonnois
Hi all,

I've updated the KIP and the PR to reflect your suggestions.
https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
https://github.com/apache/kafka/pull/2612

Also, I've exposed property StreamThread#state as a string through the new
class ThreadMetadata.

Thanks,

2017-03-27 23:40 GMT+02:00 Florian Hussonnois :

> Hi Guozhang, Matthias,
>
> It's a great idea to add sub topologies descriptions. This would help
> developers to better understand topology concept.
>
> I agree that is not really user-friendly to check if 
> `StreamsMetadata#streamThreads`
> is not returning null.
>
> The method name localThreadsMetadata looks good. In addition, it's more
> simple to build ThreadMetadata instances from the `StreamTask` class than
> from `StreamPartitionAssignor` class.
>
> I will work on modifications. As I understand, I have to add the property
> subTopologyId property to the TaskMetadata class - Am I right ?
>
> Thanks,
>
> 2017-03-26 0:25 GMT+01:00 Guozhang Wang :
>
>> Re 1): this is a good point. May be we can move
>> `StreamsMetadata#streamThreads` as `KafkaStreams#localThreadsMetadata`?
>>
>> 3): this is a minor suggestion about function name of
>> `assignedPartitions`, to `topicPartitions` to be consistent with
>> `StreamsMetadata`?
>>
>>
>> Guozhang
>>
>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax 
>> wrote:
>>
>>> Thanks for the progress on this KIP. I think we are on the right path!
>>>
>>> Couple of comments/questions:
>>>
>>> (1) Why do we not consider the "rejected alternative" to add the method
>>> to KafkaStreams? The comment on #streamThreads() says:
>>>
>>> "Note this method will return null if called on {@link
>>> StreamsMetadata} which represent a remote application."
>>>
>>> Thus, if we cannot get any remote metadata, it seems not straight
>>> forward to not add it to KafkaStreams directly -- this would avoid
>>> invalid calls and `null` return value in the first place.
>>>
>>> I like the idea about exposing sub-topologies.:
>>>
>>> (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :)
>>>
>>> (2b) We could add this to KIP-120 already. However, I would not just
>>> link both via name, but leverage KIP-120 directly, and add a
>>> "Subtopology" member to the TaskMetadata class.
>>>
>>>
>>> Overall, I like the distinction of KIP-120 only exposing "static"
>>> information that can be determined before the topology get's started,
>>> while this KIP allow to access runtime information.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/22/17 12:42 PM, Guozhang Wang wrote:
>>> > Thanks for the updated KIP, and sorry for the late replies!
>>> >
>>> > I think a little bit more about KIP-130, and I feel that if we are
>>> going
>>> > to deprecate the `toString` function (it is not explicitly said in the
>>> > KIP, so I'm not sure if you plan to still keep the
>>> > `KafkaStreams#toString` as is or are going to replace it with the
>>> > proposed APIs) with the proposed ones, it may be okay. More
>>> > specifically, after both KIP-120 and KIP-130:
>>> >
>>> > 1. users can use `#describe` function to check the generated topology
>>> > before calling `KafkaStreams#start`, which is static information.
>>> > 2. users can use the `StreamsMetadata -> ThreadMetadata ->
>>> TaskMetadata`
>>> > programmatically after called `KafkaStreams#start` to get the
>>> > dynamically changeable information.
>>> >
>>> > One thing I'm still not sure though, is that in `TaskMetadata` we only
>>> > have the TaskId and assigned partitions, whereas in
>>> > "TopologyDescription" introduced in KIP-120, it will simply describe
>>> the
>>> > whole topology possibly composed of multiple sub-topologies. So it is
>>> > hard for users to tell which sub-topology is executed under which task
>>> > on-the-fly.
>>> >
>>> > Hence I'm thinking if we can expose the "sub-topology-id" (named as
>>> > topicsGroupId internally) in TopologyDescription#Subtopology, and then
>>> > from the task id which is essentially "sub-topology-id DASH
>>> > partition-group-id" users can make the link, though it is still not
>>> that
>>> > straight-forward.
>>> >
>>> > Thoughts?
>>> >
>>> > Guozhang
>>> >
>>> >
>>> >
>>> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>>> > mailto:fhussonn...@gmail.com>> wrote:
>>> >
>>> > Thanks Guozhang for pointing me to the KIP-120.
>>> >
>>> > I've made some modifications to the KIP. I also proposed a new PR
>>> > (there is
>>> > still some tests to make).
>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3
>>> A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>> > >> 3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API>
>>> >
>>> > Exposing consumed offsets through JMX is sufficient for debugging
>>> > purpose.
>>> > But I think this could be part to another JIRA as there is no
>>> impact to
>>> > publ

[jira] [Commented] (KAFKA-3992) InstanceAlreadyExistsException Error for Consumers Starting in Parallel

2017-03-28 Thread Charly Molter (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946118#comment-15946118
 ] 

Charly Molter commented on KAFKA-3992:
--

This is not a contribution.

While I understand this limit it does sound to me that asking for unique 
client-id per thread seems like an unreasonable ask for the following reason:

1) Client-ids is a concept that survives all the way to the broker. Having to 
do aggregation this means there would be an explosion in the number of metrics 
for the brokers
2) Security and Quotas are heavily depending on client-id for .e.g this is 
extracted from the docs "can be applied to (user, client-id), user or client-id 
groups”. Adding 1000 entries to quota to allow an app with a 1000threads might 
be a bit annoying
3) The doc is clearly not saying it should be unique: "An id string to pass to 
the server when making requests. The purpose of this is to be able to track the 
source of requests beyond just ip/port by allowing a logical application name 
to be included in server-side request logging.” or  "The client id is a 
user-specified string sent in each request to help trace calls. It should 
logically identify the application making the request.”
4) KIP-98 does introduce producer-id which is supposed to be unique if 
client-id is already unique what’s the point of this producer-id?

It doesn’t seem that client id was created to identify a specific instance but 
to identify an application (which may have multiple instances of clients).

So It seems it’s either unclear in the docs or a problem in the metrics API.
What do you think [~ewencp]?

> InstanceAlreadyExistsException Error for Consumers Starting in Parallel
> ---
>
> Key: KAFKA-3992
> URL: https://issues.apache.org/jira/browse/KAFKA-3992
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.0.0
>Reporter: Alexander Cook
>Assignee: Ewen Cheslack-Postava
>
> I see the following error sometimes when I start multiple consumers at about 
> the same time in the same process (separate threads). Everything seems to 
> work fine afterwards, so should this not actually be an ERROR level message, 
> or could there be something going wrong that I don't see? 
> Let me know if I can provide any more info! 
> Error processing messages: Error registering mbean 
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>  
> Caused by: javax.management.InstanceAlreadyExistsException: 
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> Here is the full stack trace: 
> M[?:com.ibm.streamsx.messaging.kafka.KafkaConsumerV9.produceTuples:-1]  - 
> Error processing messages: Error registering mbean 
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>   at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>   at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>   at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:641)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:268)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
>   at 
> com.ibm.streamsx.messaging.kafka.KafkaConsumerV9.produceTuples(KafkaConsumerV9.java:129)
>   at 
> com.ibm.streamsx.messaging.kafka.KafkaConsumerV9$1.run(KafkaConsumerV

[DISCUSS] Time for 0.10.2.1 bugfix release

2017-03-28 Thread Gwen Shapira
Hi Team Kafka,

Since the 0.10.2.0 release we've fixed 13 JIRAs, few rather critical, that
are targeted for 0.10.2.1:

https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%200.10.2.1%20ORDER%20BY%20priority%20DESC%2C%20key%20DESC

We also have few outstanding issues for 0.10.2.1:

https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20fixVersion%20%3D%200.10.2.1%20ORDER%20BY%20priority%20DESC%2C%20key%20DESC

I think it's time for a 0.10.2.1 bugfix release!

Can the owners of the remaining issues get them resolved or move them to a
different release?

Once we get the "remaining" list to zero, I'll roll the first RC.

-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



[GitHub] kafka pull request #2746: KAFKA-4959: remove controller concurrent access to...

2017-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2746


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946126#comment-15946126
 ] 

ASF GitHub Bot commented on KAFKA-4959:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2746


> remove controller concurrent access to non-threadsafe NetworkClient, 
> Selector, and SSLEngine
> 
>
> Key: KAFKA-4959
> URL: https://issues.apache.org/jira/browse/KAFKA-4959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> This brought down a cluster by causing continuous controller moves.
> ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects 
> that aren't thread-safe:
> * Selector
> * NetworkClient
> * SSLEngine (this was the big one for us. We turn on SSL for interbroker 
> communication).
> As per the "Concurrency Notes" section from the [SSLEngine 
> javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
> bq. two threads must not attempt to call the same method (either wrap() or 
> unwrap()) concurrently
> SSLEngine.wrap gets called in:
> * SslTransportLayer.write
> * SslTransportLayer.handshake
> * SslTransportLayer.close
> It turns out that the ZkEventThread and RequestSendThread can concurrently 
> call SSLEngine.wrap:
> * ZkEventThread calls SslTransportLayer.close from 
> ControllerChannelManager.removeExistingBroker
> * RequestSendThread can call SslTransportLayer.write or 
> SslTransportLayer.handshake from NetworkClient.poll
> Suppose the controller moves for whatever reason. The former controller could 
> have had a RequestSendThread who was in the middle of sending out messages to 
> the cluster while the ZkEventThread began executing 
> KafkaController.onControllerResignation, which calls 
> ControllerChannelManager.shutdown, which sequentially cleans up the 
> controller-to-broker queue and connection for every broker in the cluster. 
> This cleanup includes the call to 
> ControllerChannelManager.removeExistingBroker as mentioned earlier, causing 
> the concurrent call to SSLEngine.wrap. This concurrent call throws a 
> BufferOverflowException which ControllerChannelManager.removeExistingBroker 
> catches so the ControllerChannelManager.shutdown moves onto cleaning up the 
> next controller-to-broker queue and connection, skipping the cleanup steps 
> such as clearing the queue, stopping the RequestSendThread, and removing the 
> entry from its brokerStateInfo map.
> By failing out of the Selector.close, the sensors corresponding to the broker 
> connection has not been cleaned up. Any later attempt at initializing an 
> identical Selector will result in a sensor collision and therefore cause 
> Selector initialization to throw an exception. In other words, any later 
> attempts by this broker to become controller again will fail on 
> initialization. When controller initialization fails, the controller deletes 
> the /controller znode and lets another broker take over.
> Now suppose the controller moves enough times such that every broker hits the 
> BufferOverflowException concurrency issue. We're now guaranteed to fail 
> controller initialization due to the sensor collision on every controller 
> transition, so the controller will move across brokers continuously.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine

2017-03-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4959:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> remove controller concurrent access to non-threadsafe NetworkClient, 
> Selector, and SSLEngine
> 
>
> Key: KAFKA-4959
> URL: https://issues.apache.org/jira/browse/KAFKA-4959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> This brought down a cluster by causing continuous controller moves.
> ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects 
> that aren't thread-safe:
> * Selector
> * NetworkClient
> * SSLEngine (this was the big one for us. We turn on SSL for interbroker 
> communication).
> As per the "Concurrency Notes" section from the [SSLEngine 
> javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
> bq. two threads must not attempt to call the same method (either wrap() or 
> unwrap()) concurrently
> SSLEngine.wrap gets called in:
> * SslTransportLayer.write
> * SslTransportLayer.handshake
> * SslTransportLayer.close
> It turns out that the ZkEventThread and RequestSendThread can concurrently 
> call SSLEngine.wrap:
> * ZkEventThread calls SslTransportLayer.close from 
> ControllerChannelManager.removeExistingBroker
> * RequestSendThread can call SslTransportLayer.write or 
> SslTransportLayer.handshake from NetworkClient.poll
> Suppose the controller moves for whatever reason. The former controller could 
> have had a RequestSendThread who was in the middle of sending out messages to 
> the cluster while the ZkEventThread began executing 
> KafkaController.onControllerResignation, which calls 
> ControllerChannelManager.shutdown, which sequentially cleans up the 
> controller-to-broker queue and connection for every broker in the cluster. 
> This cleanup includes the call to 
> ControllerChannelManager.removeExistingBroker as mentioned earlier, causing 
> the concurrent call to SSLEngine.wrap. This concurrent call throws a 
> BufferOverflowException which ControllerChannelManager.removeExistingBroker 
> catches so the ControllerChannelManager.shutdown moves onto cleaning up the 
> next controller-to-broker queue and connection, skipping the cleanup steps 
> such as clearing the queue, stopping the RequestSendThread, and removing the 
> entry from its brokerStateInfo map.
> By failing out of the Selector.close, the sensors corresponding to the broker 
> connection has not been cleaned up. Any later attempt at initializing an 
> identical Selector will result in a sensor collision and therefore cause 
> Selector initialization to throw an exception. In other words, any later 
> attempts by this broker to become controller again will fail on 
> initialization. When controller initialization fails, the controller deletes 
> the /controller znode and lets another broker take over.
> Now suppose the controller moves enough times such that every broker hits the 
> BufferOverflowException concurrency issue. We're now guaranteed to fail 
> controller initialization due to the sensor collision on every controller 
> transition, so the controller will move across brokers continuously.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine

2017-03-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4959:
---
Fix Version/s: 0.10.2.1
   0.11.0.0

> remove controller concurrent access to non-threadsafe NetworkClient, 
> Selector, and SSLEngine
> 
>
> Key: KAFKA-4959
> URL: https://issues.apache.org/jira/browse/KAFKA-4959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> This brought down a cluster by causing continuous controller moves.
> ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects 
> that aren't thread-safe:
> * Selector
> * NetworkClient
> * SSLEngine (this was the big one for us. We turn on SSL for interbroker 
> communication).
> As per the "Concurrency Notes" section from the [SSLEngine 
> javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
> bq. two threads must not attempt to call the same method (either wrap() or 
> unwrap()) concurrently
> SSLEngine.wrap gets called in:
> * SslTransportLayer.write
> * SslTransportLayer.handshake
> * SslTransportLayer.close
> It turns out that the ZkEventThread and RequestSendThread can concurrently 
> call SSLEngine.wrap:
> * ZkEventThread calls SslTransportLayer.close from 
> ControllerChannelManager.removeExistingBroker
> * RequestSendThread can call SslTransportLayer.write or 
> SslTransportLayer.handshake from NetworkClient.poll
> Suppose the controller moves for whatever reason. The former controller could 
> have had a RequestSendThread who was in the middle of sending out messages to 
> the cluster while the ZkEventThread began executing 
> KafkaController.onControllerResignation, which calls 
> ControllerChannelManager.shutdown, which sequentially cleans up the 
> controller-to-broker queue and connection for every broker in the cluster. 
> This cleanup includes the call to 
> ControllerChannelManager.removeExistingBroker as mentioned earlier, causing 
> the concurrent call to SSLEngine.wrap. This concurrent call throws a 
> BufferOverflowException which ControllerChannelManager.removeExistingBroker 
> catches so the ControllerChannelManager.shutdown moves onto cleaning up the 
> next controller-to-broker queue and connection, skipping the cleanup steps 
> such as clearing the queue, stopping the RequestSendThread, and removing the 
> entry from its brokerStateInfo map.
> By failing out of the Selector.close, the sensors corresponding to the broker 
> connection has not been cleaned up. Any later attempt at initializing an 
> identical Selector will result in a sensor collision and therefore cause 
> Selector initialization to throw an exception. In other words, any later 
> attempts by this broker to become controller again will fail on 
> initialization. When controller initialization fails, the controller deletes 
> the /controller znode and lets another broker take over.
> Now suppose the controller moves enough times such that every broker hits the 
> BufferOverflowException concurrency issue. We're now guaranteed to fail 
> controller initialization due to the sensor collision on every controller 
> transition, so the controller will move across brokers continuously.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4814) ZookeeperLeaderElector not respecting zookeeper.set.acl

2017-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946169#comment-15946169
 ] 

Ismael Juma commented on KAFKA-4814:


[~baluchicken], are you working on this JIRA? It would be good to include it in 
the next bug fix release that will be out soon.

> ZookeeperLeaderElector not respecting zookeeper.set.acl
> ---
>
> Key: KAFKA-4814
> URL: https://issues.apache.org/jira/browse/KAFKA-4814
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.10.1.1
>Reporter: Stevo Slavic
>Assignee: Balint Molnar
>  Labels: newbie
> Fix For: 0.11.0.0, 0.10.2.1
>
>
> By [migration 
> guide|https://kafka.apache.org/documentation/#zk_authz_migration] for 
> enabling ZooKeeper security on an existing Apache Kafka cluster, and [broker 
> configuration 
> documentation|https://kafka.apache.org/documentation/#brokerconfigs] for 
> {{zookeeper.set.acl}} configuration property, when this property is set to 
> false Kafka brokers should not be setting any ACLs on ZooKeeper nodes, even 
> when JAAS config file is provisioned to broker. 
> Problem is that there is broker side logic, like one in 
> {{ZookeeperLeaderElector}} making use of {{JaasUtils#isZkSecurityEnabled}}, 
> which does not respect this configuration property, resulting in ACLs being 
> set even when there's just JAAS config file provisioned to Kafka broker while 
> {{zookeeper.set.acl}} is set to {{false}}.
> Notice that {{JaasUtils}} is in {{org.apache.kafka.common.security}} package 
> of {{kafka-clients}} module, while {{zookeeper.set.acl}} is broker side only 
> configuration property.
> To make it possible without downtime to enable ZooKeeper authentication on 
> existing cluster, it should be possible to have all Kafka brokers in cluster 
> first authenticate to ZooKeeper cluster, without ACLs being set. Only once 
> all ZooKeeper clients (Kafka brokers and others) are authenticating to 
> ZooKeeper cluster then ACLs can be started being set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3866) KerberosLogin refresh time bug and other improvements

2017-03-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3866:
---
Fix Version/s: (was: 0.10.2.1)

> KerberosLogin refresh time bug and other improvements
> -
>
> Key: KAFKA-3866
> URL: https://issues.apache.org/jira/browse/KAFKA-3866
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.10.0.0
>Reporter: Ismael Juma
>Assignee: Ismael Juma
> Fix For: 0.11.0.0
>
>
> ZOOKEEPER-2295 describes a bug in the Kerberos refresh time logic that is 
> also present in our KerberosLogin class. While looking at the code, I found a 
> number of things that could be improved. More details in the PR.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4922) Fix several FindBugs warnings in Clients and Connect

2017-03-28 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4922:
---
Fix Version/s: (was: 0.10.2.1)
   0.11.0.0

> Fix several FindBugs warnings in Clients and Connect
> 
>
> Key: KAFKA-4922
> URL: https://issues.apache.org/jira/browse/KAFKA-4922
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
> Environment: Identified by FindBugs, non-software and -platform 
> specific
>Reporter: Daan Rennings
>Priority: Minor
>  Labels: newbie
> Fix For: 0.11.0.0
>
> Attachments: ClientsFindBugsReport.html, 
> ConnectAPIFindBugsReport.html, ConnectRuntimeFindBugsReport.html
>
>
> Four easy to fix warnings (not a complete set of the current FindBugs 
> warnings) as identified by FindBugs and stated in the attached reports:
> -org.apache.kafka.common.utils.Bytes.equals(Object) does not check for null 
> argument (Low priority, Bad Practice)
> -Primitive boxed just to call toString in 
> org.apache.kafka.connect.data.Decimal.builder(int) (High Priority, 
> Performance)
> -Primitive boxed just to call toString in new 
> org.apache.kafka.connect.runtime.Worker(String, Time, ConnectorFactory, 
> WorkerConfig, OffsetBackingStore) (High Priority, Performance)
> -Primitive is boxed to call Integer.compareTo(Integer): use 
> Integer.compare(int, int) instead (High Priority, Performance)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk7 #2046

2017-03-28 Thread Apache Jenkins Server
See 


Changes:

[becket.qin] Minor: Remove the accidentally checked in file which broke 
checkStyle.

--
[...truncated 895.79 KB...]

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.KafkaStreamsTest > testIllegalMetricsConfig STARTED

org.apache.kafka.streams.KafkaStreamsTest > testIllegalMetricsConfig PASSED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
testInitializesAndDestroysMetricsReporters STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
testInitializesAndDestroysMetricsReporters PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testLegalMetricsConfig STARTED

org.apache.kafka.streams.KafkaStreamsTest > testLegalMetricsConfig PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldReturnFalseOnCloseWhenThreadsHaventTerminated STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldReturnFalseOnCloseWhenThreadsHaventTerminated PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup PASSED

org.apache.kafka.streams.KafkaStreamsTest > testNumberDefaultMetrics STARTED

org.apache.kafka.streams.KafkaStreamsTest > testNumberDefaultMetric

Jenkins build is back to normal : kafka-0.10.2-jdk7 #114

2017-03-28 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-4970) Add a 'ProducerIdResource' to enable authorization for generating producer ids

2017-03-28 Thread Apurva Mehta (JIRA)
Apurva Mehta created KAFKA-4970:
---

 Summary: Add a 'ProducerIdResource' to enable authorization for 
generating producer ids
 Key: KAFKA-4970
 URL: https://issues.apache.org/jira/browse/KAFKA-4970
 Project: Kafka
  Issue Type: Improvement
Reporter: Apurva Mehta
Assignee: Apurva Mehta


With the KIP-98 idempotent producer, we introduce a new `InitPidRequest` which 
is called by a producer to get a producer id which is used to deduplicate 
messages on the broker. The broker will allocate a new producer id upon the 
receipt of this request and return it to the client. 

Currently, there is no authorization on the producer Id space. It would be good 
to add a producer id resource. This would mean that only authorized clients 
would get pids, and only authorized clients would be able to send messages 
which set the pid and sequence number.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4970) Add a 'ProducerIdResource' to enable authorization for generating producer ids

2017-03-28 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946333#comment-15946333
 ] 

Apurva Mehta commented on KAFKA-4970:
-

cc [~junrao] [~hachikuji] -- this is based on our discussion.

> Add a 'ProducerIdResource' to enable authorization for generating producer ids
> --
>
> Key: KAFKA-4970
> URL: https://issues.apache.org/jira/browse/KAFKA-4970
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>
> With the KIP-98 idempotent producer, we introduce a new `InitPidRequest` 
> which is called by a producer to get a producer id which is used to 
> deduplicate messages on the broker. The broker will allocate a new producer 
> id upon the receipt of this request and return it to the client. 
> Currently, there is no authorization on the producer Id space. It would be 
> good to add a producer id resource. This would mean that only authorized 
> clients would get pids, and only authorized clients would be able to send 
> messages which set the pid and sequence number.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk7 #2047

2017-03-28 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-4959; Remove controller concurrent access to non-threadsafe

--
[...truncated 900.22 KB...]

org.apache.kafka.streams.KafkaStreamsTest > testNumberDefaultMetrics STARTED

org.apache.kafka.streams.KafkaStreamsTest > testNumberDefaultMetrics PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithZeroByteCache STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithZeroByteCache PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithNonZeroByteCache STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithNonZeroByteCache PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithZeroSizedCache STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithZeroSizedCache PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithNonZeroSizedCache STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryStateWithNonZeroSizedCache PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecified PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testShouldReadFromRegexAndNamedTopics PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenCreated PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testMultipleConsumersCanReadFromPartitionedTopic PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testRegexMatchesTopicsAWhenDeleted PASSED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns STARTED

org.apache.kafka.streams.integration.RegexSourceIntegrationTest > 
testNoMessagesSentExceptionFromOverlappingPatterns PASSED

org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > 
shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache STARTED

org.apache.kafka.streams.integration.KStreamRepartitionJoinTest > 
shouldCorrectlyRepartitionOnJoinOperationsWith

[GitHub] kafka pull request #2738: MINOR: Support streaming decompression of fetched ...

2017-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2738


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #1382

2017-03-28 Thread Apache Jenkins Server
See 


Changes:

[becket.qin] Minor: Remove the accidentally checked in file which broke 
checkStyle.

--
[...truncated 1.68 MB...]

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithZeroByteCache STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithZeroByteCache PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithNonZeroByteCache STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegionWithNonZeroByteCache PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduce STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduce PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregate STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregate PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCount STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCount PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCountSessionWindows STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCountSessionWindows PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregateWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregateWindowed PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKTableKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKTableKTable PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > testLeftKTableKTable 
STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > testLeftKTableKTable 
PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKStream STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKStream PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKTable PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKTableKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKTableKTable PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKStream STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKStream PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKStreamKStream STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKStreamKStream PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKTable PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldInnerLeftJoin STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldInnerLeftJoin PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldOuterOuterJoin STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldOuterOuterJoin PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldInnerOuterJoin STARTED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldInnerOuterJoin PASSED

org.apache.kafka.streams.integration.KTableKTableJoinIntegrationTest > 
shouldLeftInnerJoin STARTED

org

[GitHub] kafka pull request #2742: KAFKA-4574: Ignore test_zk_security_upgrade until ...

2017-03-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2742


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4574) Transient failure in ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade with security_protocol = SASL_PLAINTEXT, SSL

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946365#comment-15946365
 ] 

ASF GitHub Bot commented on KAFKA-4574:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2742


> Transient failure in ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade 
> with security_protocol = SASL_PLAINTEXT, SSL
> ---
>
> Key: KAFKA-4574
> URL: https://issues.apache.org/jira/browse/KAFKA-4574
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Reporter: Shikhar Bhushan
>Assignee: Apurva Mehta
>
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html
> {{ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade}} failed with these 
> {{security_protocol}} parameters 
> {noformat}
> 
> test_id:
> kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SASL_PLAINTEXT
> status: FAIL
> run time:   3 minutes 44.094 seconds
> 1 acked message did not make it to the Consumer. They are: [5076]. We 
> validated that the first 1 of these missing messages correctly made it into 
> Kafka's data files. This suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 117, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 101, in run_produce_consume_validate
> self.validate()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 163, in validate
> assert success, msg
> AssertionError: 1 acked message did not make it to the Consumer. They are: 
> [5076]. We validated that the first 1 of these missing messages correctly 
> made it into Kafka's data files. This suggests they were lost on their way to 
> the consumer.
> {noformat}
> {noformat}
> 
> test_id:
> kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL
> status: FAIL
> run time:   3 minutes 50.578 seconds
> 1 acked message did not make it to the Consumer. They are: [3559]. We 
> validated that the first 1 of these missing messages correctly made it into 
> Kafka's data files. This suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 117, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 101, in run_produce_consume_validate
> self.validate()
>   File 
> "/v

[GitHub] kafka pull request #2756: KAFKA-4818: Implement transactional producer

2017-03-28 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2756

KAFKA-4818: Implement transactional producer

add public API for transactions
 - add new Producer methods to interface
 - add new Producer and Consumer config parameters

We want to add the API early to break the dependency for Streams EoS 
implementation and allow for parallel development.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
exactly-once-transactions-add-public-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2756


commit c837bea3a475538add095b92d7313eefaa5f1817
Author: Matthias J. Sax 
Date:   2017-03-08T23:17:06Z

KAFKA-4818: add public API for transactions
 - add new Producer methods to interface
 - add new Producer and Consumer config parameters




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4818) Implement transactional producer

2017-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946370#comment-15946370
 ] 

ASF GitHub Bot commented on KAFKA-4818:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2756

KAFKA-4818: Implement transactional producer

add public API for transactions
 - add new Producer methods to interface
 - add new Producer and Consumer config parameters

We want to add the API early to break the dependency for Streams EoS 
implementation and allow for parallel development.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
exactly-once-transactions-add-public-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2756


commit c837bea3a475538add095b92d7313eefaa5f1817
Author: Matthias J. Sax 
Date:   2017-03-08T23:17:06Z

KAFKA-4818: add public API for transactions
 - add new Producer methods to interface
 - add new Producer and Consumer config parameters




> Implement transactional producer
> 
>
> Key: KAFKA-4818
> URL: https://issues.apache.org/jira/browse/KAFKA-4818
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, core, producer 
>Reporter: Jason Gustafson
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> This covers the implementation of the transaction coordinator and the changes 
> to the producer and consumer to support transactions. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-135 : Send of null key to a compacted topic should throw non-retriable error back to user

2017-03-28 Thread Ismael Juma
Hi,

I think error messages and error codes serve different purposes. Error
messages provide additional information about the error, but users should
never have to match on a message to handle an error/exception. For this
case, it seems like this is a fatal error so we could get away with just
using an error message. Having said that, InvalidKeyError is not too
specific and I'm OK with that too.

As I said earlier, I do think that we need to change the following

"It is recommended that we upgrade the clients before the broker is
upgraded, so that the clients would be able to understand the new
exception."

This is problematic since we want older clients to work with newer brokers.
That's why I recommended that we only throw this error if the
ProduceRequest is version 3 or higher.

Ismael

P.S. Note that we already send error messages back for the CreateTopics
protocol API (I added that in the previous release).

On Tue, Mar 28, 2017 at 7:22 AM, Mayuresh Gharat  wrote:

> I think, it's OK to do this right now.
> The other KIP will have a wider base to cover as it will include other
> exceptions as well and will take time.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Mar 27, 2017 at 11:20 PM Dong Lin  wrote:
>
> > Sorry, I forget that you have mentioned this idea in your previous
> reply. I
> > guess the question is, do we still need this KIP if we can have custom
> > error message specified in the exception via the other KIP?
> >
> >
> > On Mon, Mar 27, 2017 at 11:00 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Dong,
> > >
> > > I do agree with that as I said before the thought did cross my mind
> and I
> > > am working on getting another KIP ready to have error responses
> returned
> > > back to the client.
> > >
> > > In my opinion, it's OK to add a new error code if it justifies the
> need.
> > As
> > > Ismael, mentioned on the jira, we need a specific non retriable error
> > code
> > > in this case, with specific message, at least until the other KIP is
> > ready.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > > On Mon, Mar 27, 2017 at 10:55 PM Dong Lin  wrote:
> > >
> > > > Hey Mayuresh,
> > > >
> > > > I get that you want to provide a more specific error message to user.
> > > Then
> > > > would it be more useful to have a KIP that allows custom error
> message
> > to
> > > > be returned to client together with the exception in the response?
> For
> > > > example, broker can include in the response
> > PolicyViolationException("key
> > > > can not be null for non-compact topic ${topic}") and client can print
> > > this
> > > > error message in the log. My concern with current KIP that it is not
> > very
> > > > scalable to always have a KIP and class for every new error we may
> see
> > in
> > > > the future. The list of error classes, and the errors that need to be
> > > > caught and handled by the client code, will increase overtime and
> > become
> > > > harder to maintain.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Mon, Mar 27, 2017 at 7:20 PM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com
> > > > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > I had thought about this before and wanted to do similar thing. But
> > as
> > > > was
> > > > > pointed out in the jira ticket, we wanted something more specific
> > than
> > > > > general.
> > > > > The main issue is that we do not propagate server side error
> messages
> > > to
> > > > > clients, right now. I am working on a KIP proposal to propose it.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > > On Mon, Mar 27, 2017 at 5:55 PM, Dong Lin 
> > wrote:
> > > > >
> > > > > > Hey Mayuresh,
> > > > > >
> > > > > > Thanks for the patch. I am wondering if it would be better to
> add a
> > > > more
> > > > > > general error, e.g. InvalidMessageException. The benefit is that
> we
> > > can
> > > > > > reuse this for other message level error instead of adding one
> > > > exception
> > > > > > class for each possible exception in the future. This is similar
> to
> > > the
> > > > > use
> > > > > > of InvalidRequestException. For example, ListOffsetResponse may
> > > return
> > > > > > InvalidRequestException if duplicate partitions are found in the
> > > > > > ListOffsetRequest. We don't return DuplicatedPartitionException
> in
> > > this
> > > > > > case.
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Mar 22, 2017 at 3:07 PM, Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > We have created KIP-135 to propose that Kafka should return a
> > > > > > non-retriable
> > > > > > > error when the producer produces a message with null key to a
> log
> > > > > > compacted
> > > > > > > topic.
> > > > > > >
> > > > > > > Please find the KIP wiki in the link :
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>

[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2017-03-28 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946384#comment-15946384
 ] 

Stephane Maarek commented on KAFKA-3083:


Just hit this issue in prod, and the only solution was to reboot the broker 
that got put in an inconsistent state

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>  Labels: reliability
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2017-03-28 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946384#comment-15946384
 ] 

Stephane Maarek edited comment on KAFKA-3083 at 3/29/17 1:42 AM:
-

Just hit this issue in prod with Kafka 0.10.2.0, and the only solution was to 
reboot the broker that got put in an inconsistent state


was (Author: stephane.maa...@gmail.com):
Just hit this issue in prod, and the only solution was to reboot the broker 
that got put in an inconsistent state

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>  Labels: reliability
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3436) Speed up controlled shutdown.

2017-03-28 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946392#comment-15946392
 ] 

Stephane Maarek commented on KAFKA-3436:


[~onurkaraman] I can't find the JIRA / PR regarding your re-write of the 
controller. Would that address https://issues.apache.org/jira/browse/KAFKA-3083 
too?

> Speed up controlled shutdown.
> -
>
> Key: KAFKA-3436
> URL: https://issues.apache.org/jira/browse/KAFKA-3436
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>
> Currently rolling bounce a Kafka cluster with tens of thousands of partitions 
> can take very long (~2 min for each broker with ~5000 partitions/broker in 
> our environment). The majority of the time is spent on shutting down a 
> broker. The time of shutting down a broker usually  includes the following 
> parts:
> T1: During controlled shutdown, people usually want to make sure there is no 
> under replicated partitions. So shutting down a broker during a rolling 
> bounce will have to wait for the previous restarted broker to catch up. This 
> is T1.
> T2: The time to send controlled shutdown request and receive controlled 
> shutdown response. Currently the a controlled shutdown request will trigger 
> many LeaderAndIsrRequest and UpdateMetadataRequest. And also involving many 
> zookeeper update in serial.
> T3: The actual time to shutdown all the components. It is usually small 
> compared with T1 and T2.
> T1 is related to:
> A) the inbound throughput on the cluster, and 
> B) the "down" time of the broker (time between replica fetchers stop and 
> replica fetchers restart)
> The larger the traffic is, or the longer the broker stopped fetching, the 
> longer it will take for the broker to catch up and get back into ISR. 
> Therefore the longer T1 will be. Assume:
> * the in bound network traffic is X bytes/second on a broker
> * the time T1.B ("down" time) mentioned above is T
> Theoretically it will take (X * T) / (NetworkBandwidth - X) = 
> InBoundNetworkUtilization * T / (1 - InboundNetworkUtilization) for a the 
> broker to catch up after the restart. While X is out of our control, T is 
> largely related to T2.
> The purpose of this ticket is to reduce T2 by:
> 1. Batching the LeaderAndIsrRequest and UpdateMetadataRequest during 
> controlled shutdown.
> 2. Use async zookeeper write to pipeline zookeeper writes. According to 
> Zookeeper wiki(https://wiki.apache.org/hadoop/ZooKeeper/Performance), a 3 
> node ZK cluster should be able to handle 20K writes (1K size). So if we use 
> async write, likely we will be able to reduce zookeeper update time to lower 
> seconds or even sub-second level.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP 130: Expose states of active tasks to KafkaStreams public API

2017-03-28 Thread Matthias J. Sax
Thanks for updating the KIP!

I think it's good as is -- I would not add anything more to TaskMetadata.

About subtopologies and tasks. We do have the concept of subtopologies
already in KIP-120. It's only missing and ID that allow to link a
subtopology to a task.

IMHO, adding a simple variable to `Subtopoloy` that provide the id
should be sufficient. We can simply document in the JavaDocs how
Subtopology and TaskMetadata can be linked to each other.

I did update KIP-120 accordingly.


-Matthias

On 3/28/17 3:45 PM, Florian Hussonnois wrote:
> Hi all,
> 
> I've updated the KIP and the PR to reflect your suggestions.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> https://github.com/apache/kafka/pull/2612
> 
> Also, I've exposed property StreamThread#state as a string through the
> new class ThreadMetadata.
> 
> Thanks,
> 
> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois  >:
> 
> Hi Guozhang, Matthias,
> 
> It's a great idea to add sub topologies descriptions. This would
> help developers to better understand topology concept.
> 
> I agree that is not really user-friendly to check if
> `StreamsMetadata#streamThreads` is not returning null.
> 
> The method name localThreadsMetadata looks good. In addition, it's
> more simple to build ThreadMetadata instances from the `StreamTask`
> class than from `StreamPartitionAssignor` class.
> 
> I will work on modifications. As I understand, I have to add the
> property subTopologyId property to the TaskMetadata class - Am I right ?
> 
> Thanks,
> 
> 2017-03-26 0:25 GMT+01:00 Guozhang Wang  >:
> 
> Re 1): this is a good point. May be we can move
> `StreamsMetadata#streamThreads` as
> `KafkaStreams#localThreadsMetadata`?
> 
> 3): this is a minor suggestion about function name of
> `assignedPartitions`, to `topicPartitions` to be consistent with
> `StreamsMetadata`?
> 
> 
> Guozhang
> 
> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
> mailto:matth...@confluent.io>> wrote:
> 
> Thanks for the progress on this KIP. I think we are on the
> right path!
> 
> Couple of comments/questions:
> 
> (1) Why do we not consider the "rejected alternative" to add
> the method
> to KafkaStreams? The comment on #streamThreads() says:
> 
> "Note this method will return null if called on
> {@link
> StreamsMetadata} which represent a remote application."
> 
> Thus, if we cannot get any remote metadata, it seems not
> straight
> forward to not add it to KafkaStreams directly -- this would
> avoid
> invalid calls and `null` return value in the first place.
> 
> I like the idea about exposing sub-topologies.:
> 
> (2a) I would recommend to rename `topicsGroupId` to
> `subTopologyId` :)
> 
> (2b) We could add this to KIP-120 already. However, I would
> not just
> link both via name, but leverage KIP-120 directly, and add a
> "Subtopology" member to the TaskMetadata class.
> 
> 
> Overall, I like the distinction of KIP-120 only exposing
> "static"
> information that can be determined before the topology get's
> started,
> while this KIP allow to access runtime information.
> 
> 
> 
> -Matthias
> 
> 
> On 3/22/17 12:42 PM, Guozhang Wang wrote:
> > Thanks for the updated KIP, and sorry for the late replies!
> >
> > I think a little bit more about KIP-130, and I feel that
> if we are going
> > to deprecate the `toString` function (it is not explicitly
> said in the
> > KIP, so I'm not sure if you plan to still keep the
> > `KafkaStreams#toString` as is or are going to replace it
> with the
> > proposed APIs) with the proposed ones, it may be okay. More
> > specifically, after both KIP-120 and KIP-130:
> >
> > 1. users can use `#describe` function to check the
> generated topology
> > before calling `KafkaStreams#start`, which is static
> information.
> > 2. users can use the `StreamsMetadata -> ThreadMetadata ->
> TaskMetadata`
> > programmatically after called `KafkaStreams#start` to get the
> > dynamically changeable information.
> >
> > One thing I'm still not sure though, is that in
> `TaskMetadata` we only
> > have the TaskId and assigned partitions, whereas in
> > "TopologyDescription" introduced in KIP-120, it will

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-28 Thread Matthias J. Sax
With regard to KIP-130:

Form KIP-130 thread:

> About subtopologies and tasks. We do have the concept of subtopologies 
> already in KIP-120. It's only missing and ID that allow to link a subtopology 
> to a task.
> 
> IMHO, adding a simple variable to `Subtopoloy` that provide the id should be 
> sufficient. We can simply document in the JavaDocs how Subtopology and 
> TaskMetadata can be linked to each other.

I updated KIP-120 to include one for field for this.


-Matthias


On 3/27/17 4:27 PM, Matthias J. Sax wrote:
> Hi,
> 
> I would like to trigger this discussion again. It seems that the naming
> question is rather subjective and both main alternatives (w/ or w/o the
> word "Topology" in the name) have pros/cons.
> 
> If you have any further thought, please share it. At the moment I still
> propose `StreamsBuilder` in the KIP.
> 
> I also want do point out, that the VOTE thread was already started. So
> if you like the current KIP, please cast your vote there.
> 
> 
> Thanks a lot!
> 
> 
> -Matthias
> 
> 
> On 3/23/17 3:38 PM, Matthias J. Sax wrote:
>> Jay,
>>
>> about the naming schema:
>>
1. "kstreams" - the DSL
2. "processor api" - the lower level callback/topology api
3. KStream/KTable - entities in the kstreams dsl
4. "Kafka Streams" - General name for stream processing stuff in Kafka,
including both kstreams and the processor API plus the underlying
implementation.
>>
>> It think this terminology has some issues... To me, `kstreams` was
>> always not more than an abbreviation for `Kafka Streams` -- thus (1) and
>> (4) kinda collide here. Following questions on the mailing list etc I
>> often see people using kstreams or kstream exactly a abbr. for "Kafka
>> Streams"
>>
>>> I think referring to the dsl as "kstreams" is cute and pneumonic and not
>>> particularly confusing.
>>
>> I disagree here. It's a very subtle difference between `kstreams` and
>> `KStream` -- just singular/plural, thus (1) and (3) also "collide" --
>> it's just too close to each other.
>>
>> Thus, I really think it's a good idea to get a new name for the DSL to
>> get a better separation of the 4 concepts.
>>
>> Furthermore, we use the term "Streams API". Thus, I think
>> `StreamsBuilder` (or `StreamsTopologyBuilder`) are both very good names.
>>
>>
>> Thus, I prefer to keep the KIP as is (suggesting `StreamsBuilder`).
>>
>> I will start a VOTE thread. Of course, we can still discuss the naming
>> issue. :)
>>
>>
>>
>> -Matthias
>>
>>
>> On 3/22/17 8:53 PM, Jay Kreps wrote:
>>> I don't feel strongly on this, so I'm happy with whatever everyone else
>>> wants.
>>>
>>> Michael, I'm not arguing that people don't need to understand topologies, I
>>> just think it is like rocks db, you need to understand it when
>>> debugging/operating but not in the initial coding since the metaphor we're
>>> providing at this layer isn't a topology of processors but rather something
>>> like the collections api. Anyhow it won't hurt people to have it there.
>>>
>>> For the original KStreamBuilder thing, I think that came from the naming we
>>> discussed originally:
>>>
>>>1. "kstreams" - the DSL
>>>2. "processor api" - the lower level callback/topology api
>>>3. KStream/KTable - entities in the kstreams dsl
>>>4. "Kafka Streams" - General name for stream processing stuff in Kafka,
>>>including both kstreams and the processor API plus the underlying
>>>implementation.
>>>
>>> I think referring to the dsl as "kstreams" is cute and pneumonic and not
>>> particularly confusing. Just like referring to the "java collections
>>> library" isn't confusing even though it contains the Iterator interface
>>> which is not actually itself a collection.
>>>
>>> So I think KStreamBuilder should technically have been KstreamsBuilder and
>>> is intended not to be a builder of a KStream but rather the builder for the
>>> kstreams DSL. Okay, yes, that *is* slightly confusing. :-)
>>>
>>> -Jay
>>>
>>> On Wed, Mar 22, 2017 at 11:25 AM, Guozhang Wang  wrote:
>>>
 Regarding the naming of `StreamsTopologyBuilder` v.s. `StreamsBuilder` that
 are going to be used in DSL, I agree both has their arguments:

 1. On one side, people using the DSL layer probably do not need to be aware
 (or rather, "learn about") of the "topology" concept, although this concept
 is a publicly exposed one in Kafka Streams.

 2. On the other side, StreamsBuilder#build() returning a Topology object
 sounds a little weird, at least to me (admittedly subjective matter).


 Since the second bullet point seems to be more "subjective" and many people
 are not worried about it, I'm OK to go with the other option.


 Guozhang


 On Wed, Mar 22, 2017 at 8:58 AM, Michael Noll 
 wrote:

> Forwarding to kafka-user.
>
>
> -- Forwarded message --
> From: Michael Noll 
> Date: Wed, Mar 22, 2017 at 8:48 AM
> Sub

[jira] [Commented] (KAFKA-4966) Producer throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread Bo Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946408#comment-15946408
 ] 

Bo Wang commented on KAFKA-4966:


Hi, Ismael Juma. I used the 0.10.0.1 version and merged some other issues 
patches, and I found that the code have the problem, and the trunk is still the 
same, so I list the two version.

> Producer throw a NullPointerException under a network environment where 
> packet loss and error packets exist.
> 
>
> Key: KAFKA-4966
> URL: https://issues.apache.org/jira/browse/KAFKA-4966
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1, 0.10.2.0
>Reporter: Bo Wang
>
> 2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | 
> producer-1] |  Uncaught error in request completion:  | 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4966) Producer throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread Bo Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946408#comment-15946408
 ] 

Bo Wang edited comment on KAFKA-4966 at 3/29/17 2:06 AM:
-

Hi, Ismael Juma. I used the 0.10.0.1 version and merged some other issues 
patches, and I found that the code have the problem, and the trunk is still the 
same, so I list the two versions.


was (Author: wangbo23):
Hi, Ismael Juma. I used the 0.10.0.1 version and merged some other issues 
patches, and I found that the code have the problem, and the trunk is still the 
same, so I list the two version.

> Producer throw a NullPointerException under a network environment where 
> packet loss and error packets exist.
> 
>
> Key: KAFKA-4966
> URL: https://issues.apache.org/jira/browse/KAFKA-4966
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1, 0.10.2.0
>Reporter: Bo Wang
>
> 2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | 
> producer-1] |  Uncaught error in request completion:  | 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4966) Producer throw a NullPointerException under a network environment where packet loss and error packets exist.

2017-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946410#comment-15946410
 ] 

Ismael Juma commented on KAFKA-4966:


[~wangbo23], can you please post the stacktrace for the trunk case then?

> Producer throw a NullPointerException under a network environment where 
> packet loss and error packets exist.
> 
>
> Key: KAFKA-4966
> URL: https://issues.apache.org/jira/browse/KAFKA-4966
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1, 0.10.2.0
>Reporter: Bo Wang
>
> 2017-03-27 18:45:54,757 | ERROR | [kafka-producer-network-thread | 
> producer-1] |  Uncaught error in request completion:  | 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:284)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.clients.producer.internals.Sender.canRetry(Sender.java:342)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:310)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:286)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:372)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:282)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:241)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:136)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk7 #2048

2017-03-28 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Support streaming decompression of fetched records for new format

--
[...truncated 900.39 KB...]

org.apache.kafka.streams.kstream.WindowsTest > retentionTimeMustNotBeNegative 
STARTED

org.apache.kafka.streams.kstream.WindowsTest > retentionTimeMustNotBeNegative 
PASSED

org.apache.kafka.streams.kstream.WindowsTest > numberOfSegmentsMustBeAtLeastTwo 
STARTED

org.apache.kafka.streams.kstream.WindowsTest > numberOfSegmentsMustBeAtLeastTwo 
PASSED

org.apache.kafka.streams.kstream.WindowsTest > shouldSetWindowRetentionTime 
STARTED

org.apache.kafka.streams.kstream.WindowsTest > shouldSetWindowRetentionTime 
PASSED

org.apache.kafka.streams.kstream.WindowsTest > shouldSetNumberOfSegments STARTED

org.apache.kafka.streams.kstream.WindowsTest > shouldSetNumberOfSegments PASSED

org.apache.kafka.streams.integration.FanoutIntegrationTest > 
shouldFanoutTheInput STARTED

org.apache.kafka.streams.integration.FanoutIntegrationTest > 
shouldFanoutTheInput PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceSessionWindows PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduce STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduce PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregate STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregate PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCount STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCount PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCountSessionWindows STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldCountSessionWindows PASSED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregateWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
shouldAggregateWindowed PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKTableKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKTableKTable PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > testLeftKTableKTable 
STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > testLeftKTableKTable 
PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKStream STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKStream PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testLeftKStreamKTable PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKTableKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKTableKTable PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKStream STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKStream PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKStreamKStream STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testOuterKStreamKStream PASSED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKTable STARTED

org.apache.kafka.streams.integration.JoinIntegrationTest > 
testInnerKStreamKTable PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduce PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldGroupByKey PASSED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed STARTED

org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest > 
shouldReduceWindowed PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable STARTED

org.apache.kafka.streams.in

Jenkins build is back to normal : kafka-trunk-jdk7 #2049

2017-03-28 Thread Apache Jenkins Server
See 




KIP-98 Message Format Update

2017-03-28 Thread Jason Gustafson
Hi All,

As you may have noticed, the message format patch for KIP-98 was merged
last Friday. This patch also included the message format changes for
KIP-101 and KIP-82. I wanted to mention the notable differences from our
initial proposal:

1. We decided not to implement an unsigned varint type. Representing
unsigned types in Java is awkward and the benefit seemed marginal. We don't
really have any other instances of unsigned types in the protocol, so we
thought it would be simpler and more consistent to only use signed types.

2. KIP-101 adds the partition leader epoch as a new field to the message
batch header. The broker always overrides the value set by the client
before appending to the log. We were concerned about the cost to recompute
the CRC when this field changed, so we relocated the field and excluded it
from the CRC computation. This makes it similar to the offset, which is
also set by the broker and is not covered by the CRC.

3. This is not yet merged, but thanks to some work from Ismael, we intend
to change the checksum algorithm to CRC32C, which will have a native
interface in Java 9. Ismael has done the performance benchmarking and the
results look extremely promising.

See the KIP for the full format:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
.

We also wanted to share some performance results that Apurva has put
together:
https://docs.google.com/spreadsheets/d/1dHY6M7qCiX-NFvsgvaE0YoVdNq26uA8608XIh_DUpI4/edit#gid=61107630.
Overall, it's looking good, but it's worth mentioning that this depends on
disabling the record-level CRC, which we currently compute only to retain
backwards compatibility with the client API:
https://issues.apache.org/jira/browse/KAFKA-4935.

Let us know if you have any feedback.

Thanks,
Jason


[jira] [Created] (KAFKA-4971) Why is there no difference between kafka benchmark tests on SSD and HDD?

2017-03-28 Thread Dasol Kim (JIRA)
Dasol Kim created KAFKA-4971:


 Summary: Why is there no difference between kafka benchmark tests 
on SSD and HDD? 
 Key: KAFKA-4971
 URL: https://issues.apache.org/jira/browse/KAFKA-4971
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.10.0.0
 Environment: Oracle VM VirtualBox
OS : CentOs 7
Memory : 1G
Disk : 8GB
Reporter: Dasol Kim


I installed OS and kafka in the two SSD and two HDDs  to perform the kafka 
benchmark test based on the disc difference. As expected, the SSD should show 
faster results, but according to my experimental results, there is no big 
difference between SSD and HDD. why? Ohter settings have been set to default.

*test settings

zookeeper node 1, producer node : 2, broker node : 2(SSD 1, HDD 1)
test scenario : Two producers send messages to the broker and compare the 
throughtput per second of kafka installed on SSD and kafka on HDD

command : ./bin/kafka-producer-perf-test.sh --num-records 100 --record-size 
2000 --topic test --throughput 10 --producer-props 
bootstrap.servers=SN02:9092


 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2757: MINOR: stabelize flaky ResetIntegrationTest

2017-03-28 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2757

MINOR: stabelize flaky ResetIntegrationTest

We got test error `org.apache.kafka.common.errors.TopicExistsException: 
Topic 'inputTopic' already exists.` in some builds. Can reproduce reliably at 
local machine. Root cause it async "topic delete" that might not be finished 
before topic gets re-created.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka minor-fix-resetintegrationtest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2757


commit bc49b9fd4911fac6f9f24ebe4061989583d9245a
Author: Matthias J. Sax 
Date:   2017-03-29T06:23:14Z

MINOR: stabelize flaky ResetIntegrationTest




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-28 Thread Eno Thereska
+1 (non-binding)

Thanks Matthias,
Eno
> On 20 Mar 2017, at 18:27, Matthias J. Sax  wrote:
> 
> Hi,
> 
> I would like to start the vote for KIP-129. Of course, feel free to
> provide some more feedback on the DISCUSS thread.
> 
> Thanks a lot!
> 
> 
> -Matthias
>