Re: [DISCUSS] KIP-133: List and Alter Configs Admin APIs

2017-05-07 Thread James Cheng

> On May 6, 2017, at 11:27 AM, Ismael Juma  wrote:
> 
> Hi James,
> 
> Yes, that's right, it will return all config values. For topic configs,
> that means falling back to the respective broker config value, which could
> also be a default. If we fallback to the broker config (whether it's a
> default or not), is_default will be true. Does this make sense? And do you
> have any concerns related to this?
> 

Ismael,

Makes sense, and I don't have any concerns about it. Actually, I'm excited for 
the capability. That means you can completely tell how a topic is configured, 
without needing to have any knowledge of how the broker is configured.

You said that "if we fallback to the broker config (whether it's a default or 
not), is_default will be true.". I assume that if I were to ListConfig on the 
*broker*, then if a config is the kafka default, then is_default will be true. 
And if the operator specifically set an override for the broker config, then 
is_default will be false?

If that is all correct, then you can also completely tell how a broker is 
configured, without needing to look at its config file. Which is pretty 
convenient.

-James


> Ismael
> 
> On Sat, May 6, 2017 at 6:19 AM, James Cheng  wrote:
> 
>> Hi Ismael,
>> 
>> Thanks for the KIP.
>> 
>> I see that in the ListConfigs Response protocol, that configs have an
>> is_default field. Does that mean that it will include *all* config values,
>> instead of just overridden ones?
>> 
>> As an example, kafka-config.sh --describe on a topic will, right now, only
>> show overridden configs. With ListConfigs, will it show all default configs
>> for the topic, which includes the configs that were inherited from the
>> broker configs (which themselves, might also be defaults)?
>> 
>> Thanks,
>> -James
>> 
>>> On May 4, 2017, at 7:32 PM, Ismael Juma  wrote:
>>> 
>>> Hi all,
>>> 
>>> We've posted "KIP-133: List and Alter Configs Admin APIs" for discussion:
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 133%3A+List+and+Alter+Configs+Admin+APIs
>>> 
>>> This completes the first batch of AdminClient APIs so that topic, config
>>> and ACL management is supported.
>>> 
>>> Please take a look. Your feedback is appreciated.
>>> 
>>> Thanks,
>>> Ismael
>> 
>> 



[GitHub] kafka pull request #2938: KAFKA-5096: Log invalid user configs and use defau...

2017-05-07 Thread johnma14
Github user johnma14 closed the pull request at:

https://github.com/apache/kafka/pull/2938


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5096) Only log invalid user configs and overwrite with correct one

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999731#comment-15999731
 ] 

ASF GitHub Bot commented on KAFKA-5096:
---

Github user johnma14 closed the pull request at:

https://github.com/apache/kafka/pull/2938


> Only log invalid user configs and overwrite with correct one
> 
>
> Key: KAFKA-5096
> URL: https://issues.apache.org/jira/browse/KAFKA-5096
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Mariam John
>Priority: Minor
>  Labels: beginner, newbie
>
> Streams does not allow to overwrite some config parameters (eg, 
> {{enable.auto.commit}}) Currently, we throw an exception, but this is 
> actually not required, as Streams can just ignore/overwrite the user provided 
> value.
> Thus, instead of throwing, we should just log a WARN message and overwrite 
> the config with the values that suits Streams. (atm it's only one parameter 
> {{enable.auto.commit}}), but with exactly-once it's going to be more (cf. 
> KAFKA-4923). Thus, the scope of this ticket depends when it will be 
> implemented (ie, before or after KAFKA-4923).
> This ticket should also include JavaDoc updates that explain what parameters 
> cannot be specified by the user.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2990: KAFKA-5096: Log invalid user configs and use defau...

2017-05-07 Thread johnma14
GitHub user johnma14 opened a pull request:

https://github.com/apache/kafka/pull/2990

KAFKA-5096: Log invalid user configs and use defaults

Kafka Streams does not allow users to modify some consumer configurations.
Currently, it does not allow modifying the value of 'enable.auto.commit'.
If the user modifies this property, currently an exception is thrown.
The following changes were made in this patch:
- Defined a new array 'NON_CONFIGURABLE_CONSUMER_CONFIGS' to hold the names
  of the configuration parameters that is not allowed to be modified
- Defined a new method 'checkIfUnexpectedUserSpecifiedConsumerConfig' to
  check if user overwrote the values of any of the non configurable 
configuration
  parameters. If so, then log a warning message and reset the default values
- Updated the javadoc to include the configuration parameters that cannot be
  modified by users.
- Updated the corresponding tests in StreamsConfigTest.java to reflect the 
changes
  made in StreamsConfig.java

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/johnma14/kafka bug/kafka-5096

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2990.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2990






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5096) Only log invalid user configs and overwrite with correct one

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999734#comment-15999734
 ] 

ASF GitHub Bot commented on KAFKA-5096:
---

GitHub user johnma14 opened a pull request:

https://github.com/apache/kafka/pull/2990

KAFKA-5096: Log invalid user configs and use defaults

Kafka Streams does not allow users to modify some consumer configurations.
Currently, it does not allow modifying the value of 'enable.auto.commit'.
If the user modifies this property, currently an exception is thrown.
The following changes were made in this patch:
- Defined a new array 'NON_CONFIGURABLE_CONSUMER_CONFIGS' to hold the names
  of the configuration parameters that is not allowed to be modified
- Defined a new method 'checkIfUnexpectedUserSpecifiedConsumerConfig' to
  check if user overwrote the values of any of the non configurable 
configuration
  parameters. If so, then log a warning message and reset the default values
- Updated the javadoc to include the configuration parameters that cannot be
  modified by users.
- Updated the corresponding tests in StreamsConfigTest.java to reflect the 
changes
  made in StreamsConfig.java

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/johnma14/kafka bug/kafka-5096

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2990.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2990






> Only log invalid user configs and overwrite with correct one
> 
>
> Key: KAFKA-5096
> URL: https://issues.apache.org/jira/browse/KAFKA-5096
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Mariam John
>Priority: Minor
>  Labels: beginner, newbie
>
> Streams does not allow to overwrite some config parameters (eg, 
> {{enable.auto.commit}}) Currently, we throw an exception, but this is 
> actually not required, as Streams can just ignore/overwrite the user provided 
> value.
> Thus, instead of throwing, we should just log a WARN message and overwrite 
> the config with the values that suits Streams. (atm it's only one parameter 
> {{enable.auto.commit}}), but with exactly-once it's going to be more (cf. 
> KAFKA-4923). Thus, the scope of this ticket depends when it will be 
> implemented (ie, before or after KAFKA-4923).
> This ticket should also include JavaDoc updates that explain what parameters 
> cannot be specified by the user.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-138: Change punctuate semantics

2017-05-07 Thread Eno Thereska
+1 (non binding)

Thanks
Eno
> On May 6, 2017, at 11:01 PM, Bill Bejeck  wrote:
> 
> +1
> 
> Thanks,
> Bill
> 
> On Sat, May 6, 2017 at 5:58 PM, Matthias J. Sax 
> wrote:
> 
>> +1
>> 
>> Thanks a lot for this KIP!
>> 
>> -Matthias
>> 
>> On 5/6/17 10:18 AM, Michal Borowiecki wrote:
>>> Hi all,
>>> 
>>> Given I'm not seeing any contentious issues remaining on the discussion
>>> thread, I'd like to initiate the vote for:
>>> 
>>> KIP-138: Change punctuate semantics
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 138%3A+Change+punctuate+semantics
>>> 
>>> 
>>> Thanks,
>>> Michał
>>> --
>>> Signature
>>>  Michal Borowiecki
>>> Senior Software Engineer L4
>>>  T:  +44 208 742 1600
>>> 
>>> 
>>>  +44 203 249 8448
>>> 
>>> 
>>> 
>>>  E:  michal.borowie...@openbet.com
>>>  W:  www.openbet.com 
>>> 
>>> 
>>>  OpenBet Ltd
>>> 
>>>  Chiswick Park Building 9
>>> 
>>>  566 Chiswick High Rd
>>> 
>>>  London
>>> 
>>>  W4 5XT
>>> 
>>>  UK
>>> 
>>> 
>>> 
>>> 
>>> This message is confidential and intended only for the addressee. If you
>>> have received this message in error, please immediately notify the
>>> postmas...@openbet.com  and delete it
>>> from your system as well as any copies. The content of e-mails as well
>>> as traffic data may be monitored by OpenBet for employment and security
>>> purposes. To protect the environment please do not print this e-mail
>>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
>>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
>>> registered in England and Wales. Registered no. 3134634. VAT no.
>>> GB927523612
>>> 
>> 
>> 



Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-05-07 Thread Eno Thereska
I like this KIP in general and I agree it’s needed. Perhaps Damian can comment 
on the session store issue?

Thanks
Eno
> On May 6, 2017, at 10:32 PM, Michal Borowiecki 
>  wrote:
> 
> Hi Matthias,
> 
> Agreed. I tried your proposal and indeed it would work.
> 
> However, I think to maintain full backward compatibility we would also need 
> to deprecate Stores.create() and leave it unchanged, while providing a new 
> method that returns the more strongly typed Factories.
> 
> ( This is because PersistentWindowFactory and PersistentSessionFactory cannot 
> extend the existing PersistentKeyValueFactory interface, since their build() 
> methods will be returning TypedStateStoreSupplier> and 
> TypedStateStoreSupplier> respectively, which are NOT 
> subclasses of TypedStateStoreSupplier>. I do not see 
> another way around it. Admittedly, my type covariance skills are rudimentary. 
> Does anyone see a better way around this? )
> 
> Since create() takes only the store name as argument, and I don't see what we 
> could overload it with, the new method would need to have a different name. 
> 
> Alternatively, since create(String) is the only method in Stores, we could 
> deprecate the entire class and provide a new one. That would be my 
> preference. Any ideas what to call it?
> 
> 
> 
> All comments and suggestions appreciated.
> 
> 
> 
> Cheers,
> 
> Michał
> 
> 
> On 04/05/17 21:48, Matthias J. Sax wrote:
>> I had a quick look into this.
>> 
>> With regard to backward compatibility, I think it would be required do
>> introduce a new type `TypesStateStoreSupplier` (that extends
>> `StateStoreSupplier`) and to overload all methods that take a
>> `StateStoreSupplier` that accept the new type instead of the current one.
>> 
>> This would allow `.build` to return a `TypedStateStoreSupplier` and
>> thus, would not break any code. As least if I did not miss anything with
>> regard to some magic of type inference using generics (I am not an
>> expert in this field).
>> 
>> 
>> -Matthias
>> 
>> On 5/4/17 11:32 AM, Matthias J. Sax wrote:
>>> Did not have time to have a look. But backward compatibility is a must
>>> from my point of view.
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 5/4/17 12:56 AM, Michal Borowiecki wrote:
 Hello,
 
 I've updated the KIP with missing information.
 
 I would especially appreciate some comments on the compatibility aspects
 of this as the proposed change is not fully backwards-compatible.
 
 In the absence of comments I shall call for a vote in the next few days.
 
 Thanks,
 
 Michal
 
 
 On 30/04/17 23:11, Michal Borowiecki wrote:
> Hi community!
> 
> I have just drafted KIP-147: Add missing type parameters to
> StateStoreSupplier factories and KGroupedStream/Table methods
> 
>  
> 
> 
> Please let me know if this a step in the right direction.
> 
> All comments welcome.
> 
> Thanks,
> Michal
> -- 
> Signature
>     Michal 
> Borowiecki
> Senior Software Engineer L4
>   T:  +44 208 742 1600
> 
>   
>   +44 203 249 8448
> 
>   
>
>   E:  michal.borowie...@openbet.com 
> 
>   W:  www.openbet.com  
>  
> 
>   
>   OpenBet Ltd
> 
>   Chiswick Park Building 9
> 
>   566 Chiswick High Rd
> 
>   London
> 
>   W4 5XT
> 
>   UK
> 
>   
>  
> 
> 
> This message is confidential and intended only for the addressee. If
> you have received this message in error, please immediately notify the
> postmas...@openbet.com  
>   and 
> delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and
> security purposes. To protect the environment please do not print this
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
> company registered in England and Wales. Registered no. 3134634. VAT
> no. GB927523612
> 
 -- 
 Signature
  Michal 
 Borowiecki
 Senior Software Engineer L4
T:  +44 208 742 1600
 

+44 203 249 8448
 

 
E:  michal.borowie...@openbet.com 
 
W: 

Kafka Connect Parquet Support?

2017-05-07 Thread Clayton Wohl
With the Kafka Connect S3 sink, I can choose Avro or JSON output format. Is
there any chance that Parquet will be supported?

For record at a time processing, Parquet isn't a good fit. But for
reading/writing batches of records, which is what the Kafka Connect Sink is
writing, Parquet is generally better than Avro.

Would attempting writing support for this be wise to try or not?


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-05-07 Thread Eno Thereska
Hi Kyle,

Thanks for the KIP again. A couple of comments:

- minor: could you add an exact example (similar to what Jay’s example is, or 
like your Spark/Pig pointers had) to make this super concrete?

- my main concern is that we’re exposing this optimization to the DSL. In an 
ideal world, an optimizer would take the existing DSL and do the right thing 
under the covers (create just one state store, arrange the nodes etc). The 
original DSL had a bunch of small, composable pieces (group, aggregate, join) 
that this proposal groups together. I’d like to hear your thoughts on whether 
it’s possible to do this optimization with the current DSL, at the topology 
builder level.

I think there will be scope for several such optimizations in the future and 
perhaps at some point we need to think about decoupling the 1:1 mapping from 
the DSL into the physical topology.

Thanks
Eno

> On May 5, 2017, at 4:39 PM, Jay Kreps  wrote:
> 
> I haven't digested the proposal but the use case is pretty common. An
> example would be the "customer 360" or "unified customer profile" use case
> we often use. In that use case you have a dozen systems each of which has
> some information about your customer (account details, settings, billing
> info, customer service contacts, purchase history, etc). Your goal is to
> join/munge these into a single profile record for each customer that has
> all the relevant info in a usable form and is up-to-date with all the
> source systems. If you implement that with kstreams as a sequence of joins
> i think today we'd fully materialize N-1 intermediate tables. But clearly
> you only need a single stage to group all these things that are already
> co-partitioned. A distributed database would do this under the covers which
> is arguably better (at least when it does the right thing) and perhaps we
> could do the same thing but I'm not sure we know the partitioning so we may
> need an explicit cogroup command that impllies they are already
> co-partitioned.
> 
> -Jay
> 
> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman 
> wrote:
> 
>> Yea thats a good way to look at it.
>> I have seen this type of functionality in a couple other platforms like
>> spark and pig.
>> https://spark.apache.org/docs/0.6.2/api/core/spark/PairRDDFunctions.html
>> https://www.tutorialspoint.com/apache_pig/apache_pig_cogroup_operator.htm
>> 
>> 
>> On May 5, 2017 7:43 AM, "Damian Guy"  wrote:
>> 
>>> Hi Kyle,
>>> 
>>> If i'm reading this correctly it is like an N way outer join? So an input
>>> on any stream will always produce a new aggregated value - is that
>> correct?
>>> Effectively, each Aggregator just looks up the current value, aggregates
>>> and forwards the result.
>>> I need to look into it and think about it a bit more, but it seems like
>> it
>>> could be a useful optimization.
>>> 
>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman 
>>> wrote:
>>> 
 I sure can. I have added the following description to my KIP. If this
 doesn't help let me know and I will take some more time to build a
>>> diagram
 and make more of a step by step description:
 
 Example with Current API:
 
 KTable table1 =
 builder.stream("topic1").groupByKey().aggregate(initializer1,
>>> aggregator1,
 aggValueSerde1, storeName1);
 KTable table2 =
 builder.stream("topic2").groupByKey().aggregate(initializer2,
>>> aggregator2,
 aggValueSerde2, storeName2);
 KTable table3 =
 builder.stream("topic3").groupByKey().aggregate(initializer3,
>>> aggregator3,
 aggValueSerde3, storeName3);
 KTable cogrouped = table1.outerJoin(table2,
 joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
 
 As you can see this creates 3 StateStores, requires 3 initializers,
>> and 3
 aggValueSerdes. This also adds the pressure to user to define what the
 intermediate values are going to be (V1, V2, V3). They are left with a
 couple choices, first to make V1, V2, and V3 all the same as CG and the
>>> two
 joiners are more like mergers, or second make them intermediate states
>>> such
 as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to
>> build
 the final aggregate CG value. This is something the user could avoid
 thinking about with this KIP.
 
 When a new input arrives lets say at "topic1" it will first go through
>> a
 KStreamAggregate grabbing the current aggregate from storeName1. It
>> will
 produce this in the form of the first intermediate value and get sent
 through a KTableKTableOuterJoin where it will look up the current value
>>> of
 the key in storeName2. It will use the first joiner to calculate the
>>> second
 intermediate value, which will go through an additional
 KTableKTableOuterJoin. Here it will look up the current value of the
>> key
>>> in
 storeName3 and use the second joiner to build the final aggregate
>> value.
 
 If you think through all possibilities for incoming topics you wil

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Michal Borowiecki
Do I understanding correctly, that with the proposed pattern one could 
not pass a lambda expression and access the context from within it?


TBH, I was hoping that for simple things this would be possible:

myStream.map( (key, value, ctx) -> new KeyValue<>(ctx.partition(), value) )

or (more to the original point of this KIP):

myStream.mapValues( (key, value, ctx) -> new MyValueWrapper(value, 
ctx.partition(), ctx.offset()) )


but it looks like that would require subclassing RichFunction? That's a 
bit of an inconvenience IMO.


Cheers,

Michal


On 07/05/17 01:29, Jeyhun Karimov wrote:

Hi,

Thanks for comments. I extended PR and KIP to include rich functions. I
will still have to evaluate the cost of deep copying of keys.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak 
wrote:


Hey Matthias,

My opinion would be that documenting the immutability of the key is the
best approach available.  Better than requiring the key to be serializable
(as with Jeyhun's last pass at the PR), no performance risk.

It'd be different if Java had immutable type constraints of some kind. :-)

Mathieu


On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax 
wrote:


Agreed about RichFunction. If we follow this path, it should cover
all(?) DSL interfaces.

About guarding the key -- I am still not sure what to do about it...
Maybe it might be enough to document it (and name the key parameter like
`readOnlyKey` to make is very clear). Ultimately, I would prefer to
guard against any modification, but I have no good idea how to do this.
Not sure what others think about the risk of corrupted partitioning
(what would be a user error and we could say, well, bad luck, you got a
bug in your code, that's not our fault), vs deep copy with a potential
performance hit (that we can't quantity atm without any performance

test).

We do have a performance system test. Maybe it's worth for you to apply
the deep copy strategy and run the test. It's very basic performance
test only, but might give some insight. If you want to do this, look
into folder "tests" for general test setup, and into
"tests/kafaktests/benchmarks/streams" to find find the perf test.


-Matthias

On 5/5/17 8:55 AM, Jeyhun Karimov wrote:

Hi Matthias,

I think extending KIP to include RichFunctions totally  makes sense.

So,

  we don't want to guard the keys because it is costly.
If we introduce RichFunctions I think it should not be limited only

the 3

interfaces proposed in KIP but to wide range of interfaces.
Please correct me if I am wrong.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax 
One follow up. There was this email on the user list:


http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=

Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+

It might make sense so include Initializer, Adder, and Substractor
inferface, too.

And we should double check if there are other interface we might miss

atm.


-Matthias


On 5/4/17 1:31 PM, Matthias J. Sax wrote:

Thanks for updating the KIP.

Deep copying the key will work for sure, but I am actually a little

bit

worried about performance impact... We might want to do some test to
quantify this impact.


Btw: this remind me about the idea of `RichFunction` interface that
would allow users to access record metadata (like timestamp, offset,
partition etc) within DSL. This would be a similar concept. Thus, I

am

wondering, if it would make sense to enlarge the scope of this KIP by
that? WDYT?



-Matthias


On 5/3/17 2:08 AM, Jeyhun Karimov wrote:

Hi Mathieu,

Thanks for feedback. I followed similar approach and updated PR and

KIP

accordingly. I tried to guard the key in Processors sending a copy

of

an

actual key.
Because I am doing deep copy of an object, I think memory can be

bottleneck

in some use-cases.

Cheers,
Jeyhun

On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <

mathieu.fenn...@replicon.com>

wrote:


Hi Jeyhun,

This approach would change ValueMapper (...etc) to be classes,

rather

than

interfaces, which is also a backwards incompatible change.  An

alternative

approach that would be backwards compatible would be to define new
interfaces, and provide overrides where those interfaces are used.

Unfortunately, making the key parameter as "final" doesn't change

much

about guarding against key change.  It only prevents the parameter

variable

from being reassigned.  If the key type is a mutable object (eg.

byte[]),

it can still be mutated. (eg. key[0] = 0).  But I'm not really sure

there's

much that can be done about that.

Mathieu


On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <

je.kari...@gmail.com

wrote:


Thanks for comments.

The concerns makes sense. Although we can guard for immutable keys

in

current implementation (with few changes), I didn't consider

backward

compatibility.

In this case 2 solutions come to my mind. In both cases, user

accesses

the

key in Object type, as passing extra type parameter will break
backwards-compati

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Jeyhun Karimov
Hi Michal,

Thanks for your comments. We proposed the similar solution to yours in KIP
(please look at rejected alternatives). However, after the discussion in
mailing list I extended it to rich functions. Maybe we should keep them
both: simple and rich versions.

Cheers,
Jeyhun

On Sun, May 7, 2017 at 11:46 AM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Do I understanding correctly, that with the proposed pattern one could not
> pass a lambda expression and access the context from within it?
>
> TBH, I was hoping that for simple things this would be possible:
>
> myStream.map( (key, value, ctx) -> new KeyValue<>(ctx.partition(), value) )
>
> or (more to the original point of this KIP):
>
> myStream.mapValues( (key, value, ctx) -> new MyValueWrapper(value,
> ctx.partition(), ctx.offset()) )
>
> but it looks like that would require subclassing RichFunction? That's a
> bit of an inconvenience IMO.
>
> Cheers,
>
> Michal
>
> On 07/05/17 01:29, Jeyhun Karimov wrote:
>
> Hi,
>
> Thanks for comments. I extended PR and KIP to include rich functions. I
> will still have to evaluate the cost of deep copying of keys.
>
> Cheers,
> Jeyhun
>
> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak  
> 
> wrote:
>
>
> Hey Matthias,
>
> My opinion would be that documenting the immutability of the key is the
> best approach available.  Better than requiring the key to be serializable
> (as with Jeyhun's last pass at the PR), no performance risk.
>
> It'd be different if Java had immutable type constraints of some kind. :-)
>
> Mathieu
>
>
> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax  
> 
> wrote:
>
>
> Agreed about RichFunction. If we follow this path, it should cover
> all(?) DSL interfaces.
>
> About guarding the key -- I am still not sure what to do about it...
> Maybe it might be enough to document it (and name the key parameter like
> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> guard against any modification, but I have no good idea how to do this.
> Not sure what others think about the risk of corrupted partitioning
> (what would be a user error and we could say, well, bad luck, you got a
> bug in your code, that's not our fault), vs deep copy with a potential
> performance hit (that we can't quantity atm without any performance
>
> test).
>
>
> We do have a performance system test. Maybe it's worth for you to apply
> the deep copy strategy and run the test. It's very basic performance
> test only, but might give some insight. If you want to do this, look
> into folder "tests" for general test setup, and into
> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>
>
> -Matthias
>
> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>
> Hi Matthias,
>
> I think extending KIP to include RichFunctions totally  makes sense.
>
> So,
>
>  we don't want to guard the keys because it is costly.
> If we introduce RichFunctions I think it should not be limited only
>
> the 3
>
> interfaces proposed in KIP but to wide range of interfaces.
> Please correct me if I am wrong.
>
> Cheers,
> Jeyhun
>
> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax 
>  wrote:
>
>
> One follow up. There was this email on the user list:
>
> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
>
> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>
>
> It might make sense so include Initializer, Adder, and Substractor
> inferface, too.
>
> And we should double check if there are other interface we might miss
>
> atm.
>
>
>
> -Matthias
>
>
> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
>
> Thanks for updating the KIP.
>
> Deep copying the key will work for sure, but I am actually a little
>
> bit
>
> worried about performance impact... We might want to do some test to
> quantify this impact.
>
>
> Btw: this remind me about the idea of `RichFunction` interface that
> would allow users to access record metadata (like timestamp, offset,
> partition etc) within DSL. This would be a similar concept. Thus, I
>
> am
>
> wondering, if it would make sense to enlarge the scope of this KIP by
> that? WDYT?
>
>
>
> -Matthias
>
>
> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
>
> Hi Mathieu,
>
> Thanks for feedback. I followed similar approach and updated PR and
>
> KIP
>
> accordingly. I tried to guard the key in Processors sending a copy
>
> of
>
> an
>
> actual key.
> Because I am doing deep copy of an object, I think memory can be
>
> bottleneck
>
> in some use-cases.
>
> Cheers,
> Jeyhun
>
> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
>
> mathieu.fenn...@replicon.com>
>
> wrote:
>
>
> Hi Jeyhun,
>
> This approach would change ValueMapper (...etc) to be classes,
>
> rather
>
> than
>
> interfaces, which is also a backwards incompatible change.  An
>
> alternative
>
> approach that would be backwards compatible would be to define new
> interfaces, and provide overrides where those interfaces are used.
>
> Unfortunately, making the key parameter as "final" doesn't change
>
> much
>
> about guarding aga

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Michal Borowiecki

Hi Jeyhun,

Thanks for your quick reply.

Indeed, I understand the existing ValueMapper/Joiner etc. have to remain 
as-is for backwards compatibility.


I was just expressing my surprise that their proposed richer equivalents 
weren't functional interfaces too.


Thanks,

Michał


On 07/05/17 12:32, Jeyhun Karimov wrote:

Hi Michal,

Thanks for your comments. We proposed the similar solution to yours in 
KIP (please look at rejected alternatives). However, after the 
discussion in mailing list I extended it to rich functions. Maybe we 
should keep them both: simple and rich versions.


Cheers,
Jeyhun

On Sun, May 7, 2017 at 11:46 AM Michal Borowiecki 
mailto:michal.borowie...@openbet.com>> 
wrote:


Do I understanding correctly, that with the proposed pattern one
could not pass a lambda expression and access the context from
within it?

TBH, I was hoping that for simple things this would be possible:

myStream.map( (key, value, ctx) -> new KeyValue<>(ctx.partition(),
value) )

or (more to the original point of this KIP):

myStream.mapValues( (key, value, ctx) -> new MyValueWrapper(value,
ctx.partition(), ctx.offset()) )

but it looks like that would require subclassing RichFunction?
That's a bit of an inconvenience IMO.

Cheers,

Michal


On 07/05/17 01:29, Jeyhun Karimov wrote:

Hi,

Thanks for comments. I extended PR and KIP to include rich functions. I
will still have to evaluate the cost of deep copying of keys.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak 

wrote:


Hey Matthias,

My opinion would be that documenting the immutability of the key is the
best approach available.  Better than requiring the key to be serializable
(as with Jeyhun's last pass at the PR), no performance risk.

It'd be different if Java had immutable type constraints of some kind. :-)

Mathieu


On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax 

wrote:


Agreed about RichFunction. If we follow this path, it should cover
all(?) DSL interfaces.

About guarding the key -- I am still not sure what to do about it...
Maybe it might be enough to document it (and name the key parameter like
`readOnlyKey` to make is very clear). Ultimately, I would prefer to
guard against any modification, but I have no good idea how to do this.
Not sure what others think about the risk of corrupted partitioning
(what would be a user error and we could say, well, bad luck, you got a
bug in your code, that's not our fault), vs deep copy with a potential
performance hit (that we can't quantity atm without any performance

test).

We do have a performance system test. Maybe it's worth for you to apply
the deep copy strategy and run the test. It's very basic performance
test only, but might give some insight. If you want to do this, look
into folder "tests" for general test setup, and into
"tests/kafaktests/benchmarks/streams" to find find the perf test.


-Matthias

On 5/5/17 8:55 AM, Jeyhun Karimov wrote:

Hi Matthias,

I think extending KIP to include RichFunctions totally  makes sense.

So,

  we don't want to guard the keys because it is costly.
If we introduce RichFunctions I think it should not be limited only

the 3

interfaces proposed in KIP but to wide range of interfaces.
Please correct me if I am wrong.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax mailto:matth...@confluent.io>
wrote:


One follow up. There was this email on the user list:


http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=

Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+

It might make sense so include Initializer, Adder, and Substractor
inferface, too.

And we should double check if there are other interface we might miss

atm.

-Matthias


On 5/4/17 1:31 PM, Matthias J. Sax wrote:

Thanks for updating the KIP.

Deep copying the key will work for sure, but I am actually a little

bit

worried about performance impact... We might want to do some test to
quantify this impact.


Btw: this remind me about the idea of `RichFunction` interface that
would allow users to access record metadata (like timestamp, offset,
partition etc) within DSL. This would be a similar concept. Thus, I

am

wondering, if it would make sense to enlarge the scope of this KIP by
that? WDYT?



-Matthias


On 5/3/17 2:08 AM, Jeyhun Karimov wrote:

Hi Mathieu,

Thanks for feedback. I followed similar approach and updated PR and

KIP

accordingly. I tried to guard the key in Processors sending a copy

of

an

actual key.
Because I am doing deep copy of an object, I think memory can be

bottleneck

in some use-cases.

C

Re: [DISCUSS] KIP-133: List and Alter Configs Admin APIs

2017-05-07 Thread Ismael Juma
Hi James,

On Sun, May 7, 2017 at 8:36 AM, James Cheng  wrote:

> Makes sense, and I don't have any concerns about it. Actually, I'm excited
> for the capability. That means you can completely tell how a topic is
> configured, without needing to have any knowledge of how the broker is
> configured.
>

That's the goal indeed.

Unrelated to this KIP, but important to keep in mind is that all brokers
must have the same topic configs for things to work in a sane manner. That
is a peculiarity of how topic configs work in Kafka: the fallback to broker
topic configs is done during load, not during store. This makes it possible
to update all topic config defaults by updating the config in every broker
following by a rolling restart (or by upgrading to a new version of Kafka
where broker topic config defaults has changed), but it can also be a
source of weird behaviour if the broker topic configs are not consistent.
There are a few ways this could be improved, but that's a subject for
another KIP. :)

You said that "if we fallback to the broker config (whether it's a default
> or not), is_default will be true.". I assume that if I were to ListConfig
> on the *broker*, then if a config is the kafka default, then is_default
> will be true. And if the operator specifically set an override for the
> broker config, then is_default will be false?
>

That's right.

If that is all correct, then you can also completely tell how a broker is
> configured, without needing to look at its config file. Which is pretty
> convenient.
>

Yeah, that is indeed the goal as well. :)

I'll update the KIP to clarify these important points.

Thanks,
Ismael


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-05-07 Thread Kyle Winkelman
*- minor: could you add an exact example (similar to what Jay’s example is,
or like your Spark/Pig pointers had) to make this super concrete?*
I have added a more concrete example to the KIP.

*- my main concern is that we’re exposing this optimization to the DSL. In
an ideal world, an optimizer would take the existing DSL and do the right
thing under the covers (create just one state store, arrange the nodes
etc). The original DSL had a bunch of small, composable pieces (group,
aggregate, join) that this proposal groups together. I’d like to hear your
thoughts on whether it’s possible to do this optimization with the current
DSL, at the topology builder level.*
You would have to make a lot of checks to understand if it is even possible
to make this optimization:
1. Make sure they are all KTableKTableOuterJoins
2. None of the intermediate KTables are used for anything else.
3. None of the intermediate stores are used. (This may be impossible
especially if they use KafkaStreams#store after the topology has already
been built.)
You would then need to make decisions during the optimization:
1. Your new initializer would the composite of all the individual
initializers and the valueJoiners.
2. I am having a hard time thinking about how you would turn the
aggregators and valueJoiners into an aggregator that would work on the
final object, but this may be possible.
3. Which state store would you use? The ones declared would be for the
aggregate values. None of the declared ones would be guaranteed to hold the
final object. This would mean you must created a new state store and not
created any of the declared ones.

The main argument I have against it is even if it could be done I don't
know that we would want to have this be an optimization in the background
because the user would still be required to think about all of the
intermediate values that they shouldn't need to worry about if they only
care about the final object.

In my opinion cogroup is a common enough case that it should be part of the
composable pieces (group, aggregate, join) because we want to allow people
to join more than 2 or more streams in an easy way. Right now I don't think
we give them ways of handling this use case easily.

*-I think there will be scope for several such optimizations in the future
and perhaps at some point we need to think about decoupling the 1:1 mapping
from the DSL into the physical topology.*
I would argue that cogroup is not just an optimization it is a new way for
the users to look at accomplishing a problem that requires multiple
streams. I may sound like a broken record but I don't think users should
have to build the N-1 intermediate tables and deal with their initializers,
serdes and stores if all they care about is the final object.
Now if for example someone uses cogroup but doesn't supply additional
streams and aggregators this case is equivalent to a single grouped stream
making an aggregate call. This case is what I view an optimization as, we
could remove the KStreamCogroup and act as if there was just a call to
KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I
would prefer to just write a warning saying that this is not how cogroup is
to be used.)

Thanks,
Kyle

On Sun, May 7, 2017 at 5:41 AM, Eno Thereska  wrote:

> Hi Kyle,
>
> Thanks for the KIP again. A couple of comments:
>
> - minor: could you add an exact example (similar to what Jay’s example is,
> or like your Spark/Pig pointers had) to make this super concrete?
>
> - my main concern is that we’re exposing this optimization to the DSL. In
> an ideal world, an optimizer would take the existing DSL and do the right
> thing under the covers (create just one state store, arrange the nodes
> etc). The original DSL had a bunch of small, composable pieces (group,
> aggregate, join) that this proposal groups together. I’d like to hear your
> thoughts on whether it’s possible to do this optimization with the current
> DSL, at the topology builder level.
>
> I think there will be scope for several such optimizations in the future
> and perhaps at some point we need to think about decoupling the 1:1 mapping
> from the DSL into the physical topology.
>
> Thanks
> Eno
>
> > On May 5, 2017, at 4:39 PM, Jay Kreps  wrote:
> >
> > I haven't digested the proposal but the use case is pretty common. An
> > example would be the "customer 360" or "unified customer profile" use
> case
> > we often use. In that use case you have a dozen systems each of which has
> > some information about your customer (account details, settings, billing
> > info, customer service contacts, purchase history, etc). Your goal is to
> > join/munge these into a single profile record for each customer that has
> > all the relevant info in a usable form and is up-to-date with all the
> > source systems. If you implement that with kstreams as a sequence of
> joins
> > i think today we'd fully materialize N-1 intermediate tables. But clearly
> > you only need a single sta

[GitHub] kafka pull request #2991: KAFKA-4208: Add Record Headers

2017-05-07 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/kafka/pull/2991

KAFKA-4208: Add Record Headers

Update upgrade.html

Raising this now, as KIP-118 is pulled from release as such submitting this 
without java 8 changes.

As per remaining review comment from 
https://github.com/apache/kafka/pull/2772, updating the upgrade notes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/IG-Group/kafka KIP-82

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2991


commit 326a781cba94030c744c3a0cf26355b5d72f9282
Author: Michael Andre Pearce 
Date:   2017-05-07T16:30:29Z

KAFKA-4208: Add Record Headers

Update upgrade.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4208) Add Record Headers

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1530#comment-1530
 ] 

ASF GitHub Bot commented on KAFKA-4208:
---

GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/kafka/pull/2991

KAFKA-4208: Add Record Headers

Update upgrade.html

Raising this now, as KIP-118 is pulled from release as such submitting this 
without java 8 changes.

As per remaining review comment from 
https://github.com/apache/kafka/pull/2772, updating the upgrade notes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/IG-Group/kafka KIP-82

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2991


commit 326a781cba94030c744c3a0cf26355b5d72f9282
Author: Michael Andre Pearce 
Date:   2017-05-07T16:30:29Z

KAFKA-4208: Add Record Headers

Update upgrade.html




> Add Record Headers
> --
>
> Key: KAFKA-4208
> URL: https://issues.apache.org/jira/browse/KAFKA-4208
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Michael Andre Pearce (IG)
>Priority: Critical
> Fix For: 0.11.0.0
>
>
> Currently headers are not natively supported unlike many transport and 
> messaging platforms or standard, this is to add support for headers to kafka
> This JIRA is related to KIP found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5185) Adding the RecordMetadata that is returned by the producer to the commitRecord method for SourceTask

2017-05-07 Thread George Smith (JIRA)
George Smith created KAFKA-5185:
---

 Summary: Adding the RecordMetadata that is returned by the 
producer to the commitRecord method for SourceTask
 Key: KAFKA-5185
 URL: https://issues.apache.org/jira/browse/KAFKA-5185
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: George Smith
 Fix For: 0.11.0.0


An improvement request I thought would be useful.

Added the producers record metadata object to the commitRecord method on the 
SourceTask class so more data is provided from the producer and it allows 
anyone overriding and hooking into the commitRecord method to receive more 
information about where the record was procuded to. 

Left the old commitRecord method with just the sourcerecord for backwards 
compatbility even though this would technically be included in a new version of 
kafka, it would intoduce a breaking change without it. 

Opened up PR here: https://github.com/apache/kafka/pull/2989



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2992: KIP-147 WIP for discussion

2017-05-07 Thread mihbor
GitHub user mihbor opened a pull request:

https://github.com/apache/kafka/pull/2992

KIP-147 WIP for discussion

This is WIP to help discussion, do not merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mihbor/kafka KIP-147-wip

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2992


commit 46d31c1ddb9bcbea81c1515a1048b36d284a2fa3
Author: Michal Borowiecki 
Date:   2017-05-07T18:08:41Z

KIP-147 WIP for discussion




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-147: Add missing type parameters to StateStoreSupplier factories and KGroupedStream/Table methods

2017-05-07 Thread Michal Borowiecki

To visualise this better, I've created a WIP PR:

https://github.com/apache/kafka/pull/2992

tentatively having named the new Stores class TypedStores.

Thanks,
Michał

On 07/05/17 10:16, Eno Thereska wrote:

I like this KIP in general and I agree it’s needed. Perhaps Damian can comment 
on the session store issue?

Thanks
Eno

On May 6, 2017, at 10:32 PM, Michal Borowiecki  
wrote:

Hi Matthias,

Agreed. I tried your proposal and indeed it would work.

However, I think to maintain full backward compatibility we would also need to 
deprecate Stores.create() and leave it unchanged, while providing a new method 
that returns the more strongly typed Factories.

( This is because PersistentWindowFactory and PersistentSessionFactory cannot extend the existing 
PersistentKeyValueFactory interface, since their build() methods will be returning 
TypedStateStoreSupplier> and TypedStateStoreSupplier> 
respectively, which are NOT subclasses of TypedStateStoreSupplier>. I do not see 
another way around it. Admittedly, my type covariance skills are rudimentary. Does anyone see a better way around 
this? )

Since create() takes only the store name as argument, and I don't see what we 
could overload it with, the new method would need to have a different name.

Alternatively, since create(String) is the only method in Stores, we could 
deprecate the entire class and provide a new one. That would be my preference. 
Any ideas what to call it?



All comments and suggestions appreciated.



Cheers,

Michał


On 04/05/17 21:48, Matthias J. Sax wrote:

I had a quick look into this.

With regard to backward compatibility, I think it would be required do
introduce a new type `TypesStateStoreSupplier` (that extends
`StateStoreSupplier`) and to overload all methods that take a
`StateStoreSupplier` that accept the new type instead of the current one.

This would allow `.build` to return a `TypedStateStoreSupplier` and
thus, would not break any code. As least if I did not miss anything with
regard to some magic of type inference using generics (I am not an
expert in this field).


-Matthias

On 5/4/17 11:32 AM, Matthias J. Sax wrote:

Did not have time to have a look. But backward compatibility is a must
from my point of view.

-Matthias


On 5/4/17 12:56 AM, Michal Borowiecki wrote:

Hello,

I've updated the KIP with missing information.

I would especially appreciate some comments on the compatibility aspects
of this as the proposed change is not fully backwards-compatible.

In the absence of comments I shall call for a vote in the next few days.

Thanks,

Michal


On 30/04/17 23:11, Michal Borowiecki wrote:

Hi community!

I have just drafted KIP-147: Add missing type parameters to
StateStoreSupplier factories and KGroupedStream/Table methods
 


Please let me know if this a step in the right direction.

All comments welcome.

Thanks,
Michal
--
Signature
  Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com 

W:  www.openbet.com   



OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


 

This message is confidential and intended only for the addressee. If
you have received this message in error, please immediately notify the
postmas...@openbet.com  
  and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and
security purposes. To protect the environment please do not print this
e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
company registered in England and Wales. Registered no. 3134634. VAT
no. GB927523612


--
Signature
  Michal Borowiecki
Senior Software Engineer L4
T:  +44 208 742 1600


+44 203 249 8448



E:  michal.borowie...@openbet.com 

W:  www.openbet.com   



OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK


 

This message i

Re: [DISCUSS] KIP-133: List and Alter Configs Admin APIs

2017-05-07 Thread Colin McCabe
On Fri, May 5, 2017, at 19:24, Ismael Juma wrote:
> Hi Colin,
> 
> Thanks for the review. Comments inline.
> 
> On Fri, May 5, 2017 at 9:45 PM, Colin McCabe  wrote:
> 
> > As Jun commented, it's probably more consistent to use
> > Collection, unless this really needs to be an ordered
> > list.  That way people can use any container they want.
> >
> 
> Agreed, updated the KIP.
> 
> > public class ListConfigsResult {
> > >public KafkaFuture> all();
> > > }
> >
> > This should be Map>, right?  Since we
> > can have multiple configurations per entity.
> >
> 
> Yes, indeed.
> 
> With the "all" function, if any listing of a ConfigEntity got an error,
> > the whole future completes exceptionally and you get no results for
> > anything.  Which is fine in some cases, but not what you want in others.
> >  So it would be nice to have a function in ListConfigsResult like:
> >
> > >public Map>>> configs();
> >
> 
> I considered that, but the proposed protocol cannot fail on a per entity
> basis during listing. So, I thought we could leave this method out for
> now.
> We could introduce it if per entity failures ever became possible. What
> do
> you think?

Hi Ismael,

Hmm.  What's the behavior if I try to list the configuration for a topic
that doesn't exist?  It seems like in this case, the whole request has
to return an error, and nothing gets listed.  Shouldn't the protocol
should return an error code and message per element in the batch,
similar to how the other batch protocols work (CreateTopics,
DeleteTopics, etc. etc.)

We also should think about the case where someone requests
configurations from multiple brokers.  Since there are multiple requests
sent over the wire in this case, there is the chance for some of them to
fail when others have succeeded.  So the API needs a way of returning an
error per batch element.

> 
> > A new ACL resource type `Configs` will be introduced with Describe, Alter
> > > and All operations. Initially, this resource type will work at a global
> > > level like the Cluster resource type. That is, the only valid resource
> > > name will be "kafka-cluster"
> >
> > Hmm.  Having a new "kafka-cluster" resource seems like it duplicates the
> > existing Cluster resource, which is also a singleton and also represents
> > the whole cluster.
> 
> 
> I was thinking of it as the Configs resource for all of the cluster as
> opposed to the Configs resource for a specific entity. So, in the future,
> the resource name could represent specific entities. Given that, I don't
> think it duplicates the existing Cluster resource.
> 
> 
> > Configurations don't really seem like a separate
> > resource type like topics or brokers-- they seem like an aspect of the
> > existing resources.
> 
> 
> Well, a broker is part of the cluster resource, but also a resource
> itself.
> So, I think it's reasonable to apply the same principle to configs.

While it's true that broker resources are a part of the cluster
resource, the brokers that we have in a cluster can change over time,
and are different for each cluster.  This makes it interesting to talk
about brokers separately from clusters.  Topics are similar-- every
cluster will have a different set of topics, so there is a clear reason
to have topic resources that are separate from the cluster resource.

On the other hand, with configurations, each topic will have a single
configuration, never more and never less.  Each cluster will have a
single configuration-- never more and never less.  So having separate
Configuration resources doesn't seem to add any value, since they will
always map 1:1 to existing resources.

> 
> What about instead having an action for reading configs and an action
> > for writing configs.  So having READ_CONFIGS on the CLUSTER resource
> > would give you the ability to read all configs, and having WRITE_CONFIGS
> > on the CLUSTER resource would give you the ability to write all configs.
> >  And being superuser, or having ALL would give you both READ_CONFIGS and
> > WRITE_CONFIGS.  Later on we can make it finer grained by granting
> > READ_CONFIGS on specific TOPIC objects for specific principals, and so
> > on.
> >
> 
> Not sure. It seems like operations are meant to be generic (ClusterAction
> is probably the exception) whereas READ_CONFIGS seems to combine an
> operation and a resource (although you did say that you don't see Configs
> as a resource).

I think it's better to have specific operations than generic ones, since
it gives administrators finer-grained control and makes it clearer what
the operations mean.  Generic-sounding operations leave people guessing
what they actually do.

It seems natural to say that having ALL on a resource like a broker
should grant you the ability to READ_CONFIGS / WRITE_CONFIGS on that
broker as well.  Some generic operations should contain more specific
ones, like we do today with DESCRIBE and READ, etc, and ALL.

> 
> > public class Config {
> > >public Config(String name, String va

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Matthias J. Sax
Michal,

thanks a lot for this comment. I did not consider lambdas when proposing
RichFunctions. I thinks it very important to preserve the ability to use
lambdas!

@Jeyhun: I did not put any thought into this, but can we have a design
that allows for both? Also, with regard to lambdas, it might make sense
to allow for both `V -> newV` and `(K, V) -> newV` ?


-Matthias


On 5/7/17 5:34 AM, Michal Borowiecki wrote:
> Hi Jeyhun,
> 
> Thanks for your quick reply.
> 
> Indeed, I understand the existing ValueMapper/Joiner etc. have to remain
> as-is for backwards compatibility.
> 
> I was just expressing my surprise that their proposed richer equivalents
> weren't functional interfaces too.
> 
> Thanks,
> 
> Michał
> 
> 
> On 07/05/17 12:32, Jeyhun Karimov wrote:
>> Hi Michal,
>>
>> Thanks for your comments. We proposed the similar solution to yours in
>> KIP (please look at rejected alternatives). However, after the
>> discussion in mailing list I extended it to rich functions. Maybe we
>> should keep them both: simple and rich versions.
>>
>> Cheers,
>> Jeyhun
>>
>> On Sun, May 7, 2017 at 11:46 AM Michal Borowiecki
>> mailto:michal.borowie...@openbet.com>>
>> wrote:
>>
>> Do I understanding correctly, that with the proposed pattern one
>> could not pass a lambda expression and access the context from
>> within it?
>>
>> TBH, I was hoping that for simple things this would be possible:
>>
>> myStream.map( (key, value, ctx) -> new KeyValue<>(ctx.partition(),
>> value) )
>>
>> or (more to the original point of this KIP):
>>
>> myStream.mapValues( (key, value, ctx) -> new MyValueWrapper(value,
>> ctx.partition(), ctx.offset()) )
>>
>> but it looks like that would require subclassing RichFunction?
>> That's a bit of an inconvenience IMO.
>>
>> Cheers,
>>
>> Michal
>>
>>
>> On 07/05/17 01:29, Jeyhun Karimov wrote:
>>> Hi,
>>>
>>> Thanks for comments. I extended PR and KIP to include rich functions. I
>>> will still have to evaluate the cost of deep copying of keys.
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak 
>>>  
>>> wrote:
>>>
 Hey Matthias,

 My opinion would be that documenting the immutability of the key is the
 best approach available.  Better than requiring the key to be 
 serializable
 (as with Jeyhun's last pass at the PR), no performance risk.

 It'd be different if Java had immutable type constraints of some kind. 
 :-)

 Mathieu


 On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax 
  
 wrote:

> Agreed about RichFunction. If we follow this path, it should cover
> all(?) DSL interfaces.
>
> About guarding the key -- I am still not sure what to do about it...
> Maybe it might be enough to document it (and name the key parameter 
> like
> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> guard against any modification, but I have no good idea how to do 
> this.
> Not sure what others think about the risk of corrupted partitioning
> (what would be a user error and we could say, well, bad luck, you got 
> a
> bug in your code, that's not our fault), vs deep copy with a potential
> performance hit (that we can't quantity atm without any performance
 test).
> We do have a performance system test. Maybe it's worth for you to 
> apply
> the deep copy strategy and run the test. It's very basic performance
> test only, but might give some insight. If you want to do this, look
> into folder "tests" for general test setup, and into
> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>
>
> -Matthias
>
> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>> Hi Matthias,
>>
>> I think extending KIP to include RichFunctions totally  makes sense.
 So,
>>  we don't want to guard the keys because it is costly.
>> If we introduce RichFunctions I think it should not be limited only
 the 3
>> interfaces proposed in KIP but to wide range of interfaces.
>> Please correct me if I am wrong.
>>
>> Cheers,
>> Jeyhun
>>
>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax 
>> mailto:matth...@confluent.io>
>> wrote:
>>
>>> One follow up. There was this email on the user list:
>>>
>>>
>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>>> It might make sense so include Initializer, Adder, and Substractor
>>> inferface, too.
>>>
>>> And we should double check if there are other interface we might 
>>> mis

[jira] [Commented] (KAFKA-5185) Adding the RecordMetadata that is returned by the producer to the commitRecord method for SourceTask

2017-05-07 Thread Konstantine Karantasis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1688#comment-1688
 ] 

Konstantine Karantasis commented on KAFKA-5185:
---

Thank you [~GeoSmith] for the suggested improvement and the patch. FYI this is 
a change to a public facing Kafka Connect API, therefore to review and merge 
this patch it first has to be submitted, discussed and approved as a KIP. 
Here's a link describing how the Kafka Improvement Proposal (KIP) process 
works: [KIP 
process|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

> Adding the RecordMetadata that is returned by the producer to the 
> commitRecord method for SourceTask
> 
>
> Key: KAFKA-5185
> URL: https://issues.apache.org/jira/browse/KAFKA-5185
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: George Smith
> Fix For: 0.11.0.0
>
>
> An improvement request I thought would be useful.
> Added the producers record metadata object to the commitRecord method on the 
> SourceTask class so more data is provided from the producer and it allows 
> anyone overriding and hooking into the commitRecord method to receive more 
> information about where the record was procuded to. 
> Left the old commitRecord method with just the sourcerecord for backwards 
> compatbility even though this would technically be included in a new version 
> of kafka, it would intoduce a breaking change without it. 
> Opened up PR here: https://github.com/apache/kafka/pull/2989



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5186) Avoid expensive initialization of producer state when upgrading

2017-05-07 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5186:
--

 Summary: Avoid expensive initialization of producer state when 
upgrading
 Key: KAFKA-5186
 URL: https://issues.apache.org/jira/browse/KAFKA-5186
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson
Priority: Critical


Currently the producer state is always loaded upon broker initialization. If we 
don't find a snapshot file to load from, then we scan the log segments from the 
beginning to rebuild the state. Of course, when users upgrade to the new 
version, there will be no snapshot file, so the upgrade could be quite 
intensive. It would be nice to avoid this by assuming instead that the absence 
of a snapshot file means that the producer state should start clean and we can 
avoid the expensive scanning.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5187) producer.close() times out at 30 seconds

2017-05-07 Thread Somnath Choudhuri (JIRA)
Somnath Choudhuri created KAFKA-5187:


 Summary: producer.close() times out at 30 seconds
 Key: KAFKA-5187
 URL: https://issues.apache.org/jira/browse/KAFKA-5187
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.1.1
Reporter: Somnath Choudhuri


In the code below, send() is successful and returns immediately. However 
producer.close() hangs and times out after 30 seconds.

 producer.send(new ProducerRecord(topic, "0", 
"test string 0")); 
 System.out.println("Send successful");
 
 long start_time = System.currentTimeMillis();
 producer.close();
 long end_time = System.currentTimeMillis();
 
 System.out.println("Time spent in closing Producer: " + 
(end_time - start_time));



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5185) Adding the RecordMetadata that is returned by the producer to the commitRecord method for SourceTask

2017-05-07 Thread George Smith (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000103#comment-16000103
 ] 

George Smith commented on KAFKA-5185:
-

[~kkonstantine]  Thanks, will do.

> Adding the RecordMetadata that is returned by the producer to the 
> commitRecord method for SourceTask
> 
>
> Key: KAFKA-5185
> URL: https://issues.apache.org/jira/browse/KAFKA-5185
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: George Smith
> Fix For: 0.11.0.0
>
>
> An improvement request I thought would be useful.
> Added the producers record metadata object to the commitRecord method on the 
> SourceTask class so more data is provided from the producer and it allows 
> anyone overriding and hooking into the commitRecord method to receive more 
> information about where the record was procuded to. 
> Left the old commitRecord method with just the sourcerecord for backwards 
> compatbility even though this would technically be included in a new version 
> of kafka, it would intoduce a breaking change without it. 
> Opened up PR here: https://github.com/apache/kafka/pull/2989



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5188) Add Integration tests for transactional producer

2017-05-07 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5188:
--

 Summary: Add Integration tests for transactional producer
 Key: KAFKA-5188
 URL: https://issues.apache.org/jira/browse/KAFKA-5188
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Apurva Mehta
Priority: Critical






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP 151 - Expose Connector type in REST API

2017-05-07 Thread Konstantine Karantasis
Nice. Thanks!

-Konstantine

On Sat, May 6, 2017 at 10:43 PM, dan  wrote:

> thanks for the feedback, it all sounds good. i have made the changes to the
> pr and the kip.
>
> dan
>
> On Fri, May 5, 2017 at 9:29 AM, Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Thank you for the KIP. It's a nice improvement.
> >
> > Two small suggestions:
> >
> > 1) Let's not use all caps to describe the type of the connector. "Source"
> > and "Sink" seem more appropriate (but even all lower case would be
> better).
> > 2) It's been discussed in other contexts recently, but I believe finally
> > exposing a connector's version here makes more sense than anywhere else
> at
> > the moment. There's an existing interface method to grab the version, and
> > publishing it through REST is not affected by any conventions made with
> > respect to versioning format (also sorting based on name and version I
> > guess is a concern that can be postponed to when we support multiple
> > versions of the same connector and this doesn't have to be addressed on a
> > KIP anyways).
> >
> > Let me know what you think. I'll add comments to the PR as well.
> > Thanks again.
> >
> > -Konstantine
> >
> > On Thu, May 4, 2017 at 4:20 PM, Gwen Shapira  wrote:
> >
> > > YES PLEASE!
> > >
> > > On Tue, May 2, 2017 at 1:48 PM, dan  wrote:
> > >
> > > > hello.
> > > >
> > > > in an attempt to make the connect rest endpoints more useful i'd like
> > to
> > > > add the Connector type (Sink/Source) in our rest endpoints to make
> them
> > > > more self descriptive.
> > > >
> > > > KIP here:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 151+Expose+Connector+type+in+REST+API
> > > > initial pr: https://github.com/apache/kafka/pull/2960
> > > >
> > > > thanks
> > > > dan
> > > >
> > >
> > >
> > >
> > > --
> > > *Gwen Shapira*
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter  | blog
> > > 
> > >
> >
>


[jira] [Updated] (KAFKA-4291) TopicCommand --describe shows topics marked for deletion as under-replicated and unavailable

2017-05-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4291:
---
Fix Version/s: 0.11.0.0

> TopicCommand --describe shows topics marked for deletion as under-replicated 
> and unavailable
> 
>
> Key: KAFKA-4291
> URL: https://issues.apache.org/jira/browse/KAFKA-4291
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.0.1
>Reporter: Mickael Maison
>Assignee: Mickael Maison
> Fix For: 0.11.0.0
>
>
> When using kafka-topics.sh --describe with --under-replicated-partitions or 
> --unavailable-partitions, topics marked for deletion are listed.
> While this is debatable whether we want to list such topics this way, it 
> should at least print that the topic is marked for deletion, like --list 
> does. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4291) TopicCommand --describe shows topics marked for deletion as under-replicated and unavailable

2017-05-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4291:
---
Status: Patch Available  (was: Open)

> TopicCommand --describe shows topics marked for deletion as under-replicated 
> and unavailable
> 
>
> Key: KAFKA-4291
> URL: https://issues.apache.org/jira/browse/KAFKA-4291
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.0.1
>Reporter: Mickael Maison
>Assignee: Mickael Maison
> Fix For: 0.11.0.0
>
>
> When using kafka-topics.sh --describe with --under-replicated-partitions or 
> --unavailable-partitions, topics marked for deletion are listed.
> While this is debatable whether we want to list such topics this way, it 
> should at least print that the topic is marked for deletion, like --list 
> does. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4795) Confusion around topic deletion

2017-05-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000140#comment-16000140
 ] 

Ismael Juma commented on KAFKA-4795:


[~vahid], supposedly you may still see this if the Controller is busy 
processing other requests.

> Confusion around topic deletion
> ---
>
> Key: KAFKA-4795
> URL: https://issues.apache.org/jira/browse/KAFKA-4795
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>
> The topic deletion works like this in 0.10.2.0:
> # {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
> # {{bin/kafka-server-start.sh config/server.properties}} (uses default 
> {{server.properties}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1}} (creates the topic {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
> Previously, the last command above returned {{test - marked for deletion}}, 
> which matched the output statement of the {{--delete}} topic command.
> Continuing with the above scenario,
> # stop the broker
> # add the broker config {{delete.topic.enable=true}} in the config file
> # {{bin/kafka-server-start.sh config/server.properties}} (this does not 
> remove the topic {{test}}, as if the topic was never marked for deletion).
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
> (reports {{Topic test is marked for deletion. Note: This will have no impact 
> if delete.topic.enable is not set to true.}})
> # {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no 
> topics).
> It seems that the "marked for deletion" state for topics no longer exists.
> I opened this JIRA so I can get a confirmation on the expected topic deletion 
> behavior, because in any case, I think the user experience could be improved 
> (either there is a bug in the code, or the command's output statement is 
> misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


KIP-Template access

2017-05-07 Thread George Smith
The KIP-Template doesn’t show up when I click to create a new KIP page







My username is GeoSmith


Thanks,

George


[jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes

2017-05-07 Thread Di Shang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000185#comment-16000185
 ] 

Di Shang commented on KAFKA-3806:
-

This is really problematic for mirrormaker since it manages offset manually (no 
auto offset commit). 

We have mirrormaker setup with 'auto.offset.reset=earliest' because we want to 
replicate the entire topic from beginning, what we see is that if a partition 
received no new messages for 1 day, mirrormaker will re-replicate everything 
from that partition up to 6 days ago causing a lot of unnecessary duplicate 
msg. 

I don't see the point having default offsets.retention < log.retention. In 
general if a consumer go offline for 1 day, then when it comes back the default 
configuration can make it (depending on auto.offset.reset) either reprocess all 
messages up to 6 days ago or skip messages during the offline period. It's a 
trap likely to cause issues in many scenarios that will catch people until they 
realize that they have to configure offsets.retention >= log.retention 
explicitly, so why not make it the default anyway. 

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --
>
> Key: KAFKA-3806
> URL: https://issues.apache.org/jira/browse/KAFKA-3806
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Michal Turek
>Priority: Minor
>
> Combination of default values of log.retention.hours (168 hours = 7 days) and 
> offsets.retention.minutes (1440 minutes = 1 day) may be dangerous in special 
> cases. Offset retention should be always greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, 
> topic wasn't deleted.
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no 
> more incoming data and there was nothing to confirm. (We have auto-commit 
> disabled, I'm not sure how behaves enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to 
> offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it 
> didn't find any committed offsets for that topic since they were deleted by 
> offsets.retention.minutes so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 
> days of messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, 
> don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice 
> larger than log.retention.hours.
> - Check these values during Kafka startup and log a warning if 
> offsets.retention.minutes is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets 
> in ZooKeeper and Kafka (http://kafka.apache.org/documentation.html#upgrade).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-153 : Include only client traffic in BytesOutPerSec metric

2017-05-07 Thread Jun Rao
Hi, Xavier,

I think the per topic metric is more useful for measuring the traffic from
the clients. For internal replication, perhaps the aggregate value is
enough.

Thanks,

Jun

On Fri, May 5, 2017 at 6:11 PM, Xavier Léauté  wrote:

> Any reason we are not keeping the per-topic breakdown for inter-broker
> traffic?
>
>
> On Fri, May 5, 2017 at 4:52 PM Onur Karaman 
> wrote:
>
> > Looks good. Thanks!
> >
> > On Fri, May 5, 2017 at 4:44 PM, Roger Hoover 
> > wrote:
> >
> > > Very helpful.  Thank you, Jun.
> > >
> > > On Fri, May 5, 2017 at 4:42 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Jun,
> > > >
> > > > Thanks for the KIP, LGTM.
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, May 5, 2017 at 3:38 PM, Ismael Juma 
> wrote:
> > > >
> > > > > Thanks Jun, looks good to me.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Fri, May 5, 2017 at 11:35 PM, Jun Rao  wrote:
> > > > >
> > > > > > Hi, Ismael,
> > > > > >
> > > > > > Good point. Updated the KIP with ReplicationBytesInPerSec.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, May 5, 2017 at 3:16 PM, Ismael Juma 
> > > wrote:
> > > > > >
> > > > > > > Thanks for the KIP, Jun. Good to fix this inconsistency. Do I
> > > > > understand
> > > > > > > correctly that we are introducing ReplicationBytesOutPerSec,
> but
> > > not
> > > > > > > ReplicationBytesInPerSec?
> > > > > > > If so, what's the reason?
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Fri, May 5, 2017 at 11:11 PM, Jun Rao 
> > wrote:
> > > > > > >
> > > > > > > > Hi, Everyone,
> > > > > > > >
> > > > > > > > We created "KIP-153 : Include only client traffic in
> > > BytesOutPerSec
> > > > > > > > metric".
> > > > > > > >
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > 153+%3A+Include+only+client+traffic+in+BytesOutPerSec+metric
> > > > > > > >
> > > > > > > > Please take a look and provide your feedback.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


[VOTE] KIP-153 : Include only client traffic in BytesOutPerSec metric

2017-05-07 Thread Jun Rao
Hi, Everyone,

I would like to start the voting process for KIP-153 : Include only client
traffic in BytesOutPerSec metric.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
153+%3A+Include+only+client+traffic+in+BytesOutPerSec+metric

The vote will run for a minimum of 72 hours.

Thanks,

Jun


[jira] [Created] (KAFKA-5189) trunk unstable - DescribeConsumerGroupTest.testDescribeGroupWithNewConsumerWithShortInitializationTimeout fails 90% of times

2017-05-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-5189:
---

 Summary: trunk unstable - 
DescribeConsumerGroupTest.testDescribeGroupWithNewConsumerWithShortInitializationTimeout
 fails 90% of times
 Key: KAFKA-5189
 URL: https://issues.apache.org/jira/browse/KAFKA-5189
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt


ran the build 10 times on my machine, fails 90% of the time



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5189) trunk unstable - DescribeConsumerGroupTest.testDescribeGroupWithNewConsumerWithShortInitializationTimeout fails 90% of times

2017-05-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000197#comment-16000197
 ] 

Ismael Juma commented on KAFKA-5189:


Possibly related to KAFKA-4948.

> trunk unstable - 
> DescribeConsumerGroupTest.testDescribeGroupWithNewConsumerWithShortInitializationTimeout
>  fails 90% of times
> 
>
> Key: KAFKA-5189
> URL: https://issues.apache.org/jira/browse/KAFKA-5189
> Project: Kafka
>  Issue Type: Bug
>Reporter: radai rosenblatt
>
> ran the build 10 times on my machine, fails 90% of the time



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[VOTE] KIP-153 (separating replication traffic from BytesOutPerSec metric)

2017-05-07 Thread Jun Rao
Hi, Everyone,

Since this is a relatively simple change, I would like to start the voting
process for KIP-153 : Include only client traffic in BytesOutPerSec metric.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-153+%
3A+Include+only+client+traffic+in+BytesOutPerSec+metric

The vote will run for a minimum of 72 hours.

Thanks,

Jun


Re: [VOTE] KIP-153 (separating replication traffic from BytesOutPerSec metric)

2017-05-07 Thread Ismael Juma
Thanks for the KIP, Jun. +1 from me.

Ismael

On Mon, May 8, 2017 at 3:40 AM, Jun Rao  wrote:

> Hi, Everyone,
>
> Since this is a relatively simple change, I would like to start the voting
> process for KIP-153 : Include only client traffic in BytesOutPerSec metric.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-153+%
> 3A+Include+only+client+traffic+in+BytesOutPerSec+metric
>
> The vote will run for a minimum of 72 hours.
>
> Thanks,
>
> Jun
>


Re: [DISCUSS] KIP-133: List and Alter Configs Admin APIs

2017-05-07 Thread Ismael Juma
Thanks for the feedback Colin. Comments inline.

On Sun, May 7, 2017 at 9:29 PM, Colin McCabe  wrote:
>
> Hmm.  What's the behavior if I try to list the configuration for a topic
> that doesn't exist?  It seems like in this case, the whole request has
> to return an error, and nothing gets listed.  Shouldn't the protocol
> should return an error code and message per element in the batch,
> similar to how the other batch protocols work (CreateTopics,
> DeleteTopics, etc. etc.)
>

CreateTopics and DeleteTopics are more like AlterConfigs and that has an
error code per batch. For requests that mutate, this is essential because
the operations are not transactional. I followed the same pattern as
ListGroups, but that doesn't take filters, so MetadataRequest is a better
example. For consistency with that, I added an error code per batch.

We also should think about the case where someone requests
> configurations from multiple brokers.  Since there are multiple requests
> sent over the wire in this case, there is the chance for some of them to
> fail when others have succeeded.  So the API needs a way of returning an
> error per batch element.
>

Yeah, that's true. For the AdminClient side, I followed the same pattern as
ListTopicsResponse, but it seems like DescribeTopicsResults is a better
example. I named this request ListConfigs for consistency with ListAcls,
but it looks like they really should be DescribeConfigs and DescribeAcls.

On the other hand, with configurations, each topic will have a single
> configuration, never more and never less.  Each cluster will have a
> single configuration-- never more and never less.  So having separate
> Configuration resources doesn't seem to add any value, since they will
> always map 1:1 to existing resources.
>

Good point. I thought about your proposal in more depth and I agree that it
solves the issue in a nice way. I updated the KIP.

I guess my question is more conceptual-- if these things are all
> resources, shouldn't we have a single type to model all of them?  We
> might want to add configurations to other resources later on as well.
>

I've been thinking about how we could satisfy the 2 requirements of having
a general resource type while making it clear which values are allowed for
a given operation. I updated the KIP to use a shared resource type in the
wire, renamed entity to resource, but kept a separate class and enum for
ConfigResource and ConfigResource.Type. They map trivially to Resource and
ResourceType.

Also, I realised that I was a bit hasty when I changed Config to
Collection in the signature of a few methods. I think Config
is the right type. It's a container for a Collection of ConfigEntry
instances so that we can provide useful methods to work with the configs
(e.g. exclude items with defaults, etc.).

Here's the full diff of the changes:

https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=68719318&selectedPageVersions=14&selectedPageVersions=11

Given that we don't have much time until the KIP freeze and this is an
important KIP to make the AdminClient truly useful, I will start the vote
thread. That said, don't hesitate to provide additional feedback.

Thanks.
Ismael


Re: [VOTE] KIP-153 (separating replication traffic from BytesOutPerSec metric)

2017-05-07 Thread James Cheng
+1 (non-binding)

James

> On May 7, 2017, at 10:59 PM, Ismael Juma  wrote:
> 
> Thanks for the KIP, Jun. +1 from me.
> 
> Ismael
> 
> On Mon, May 8, 2017 at 3:40 AM, Jun Rao  wrote:
> 
>> Hi, Everyone,
>> 
>> Since this is a relatively simple change, I would like to start the voting
>> process for KIP-153 : Include only client traffic in BytesOutPerSec metric.
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-153+%
>> 3A+Include+only+client+traffic+in+BytesOutPerSec+metric
>> 
>> The vote will run for a minimum of 72 hours.
>> 
>> Thanks,
>> 
>> Jun
>> 



[jira] [Updated] (KAFKA-3878) Exponential backoff for broker reconnect attempts

2017-05-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3878:
---
Fix Version/s: 0.11.0.0

> Exponential backoff for broker reconnect attempts
> -
>
> Key: KAFKA-3878
> URL: https://issues.apache.org/jira/browse/KAFKA-3878
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, network
>Reporter: Dana Powers
>Assignee: Dana Powers
>  Labels: kip
> Fix For: 0.11.0.0
>
>
> The client currently uses a constant backoff policy, configured via 
> 'reconnect.backoff.ms' . To reduce network load during longer broker outages, 
> it would be useful to support an optional exponentially increasing backoff 
> policy.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3878) Exponential backoff for broker reconnect attempts

2017-05-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3878:
---
Assignee: Dana Powers  (was: Ismael Juma)
  Status: Patch Available  (was: Open)

> Exponential backoff for broker reconnect attempts
> -
>
> Key: KAFKA-3878
> URL: https://issues.apache.org/jira/browse/KAFKA-3878
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, network
>Reporter: Dana Powers
>Assignee: Dana Powers
>  Labels: kip
>
> The client currently uses a constant backoff policy, configured via 
> 'reconnect.backoff.ms' . To reduce network load during longer broker outages, 
> it would be useful to support an optional exponentially increasing backoff 
> policy.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4982) Add listener tag to socket-server-metrics.connection-... metrics

2017-05-07 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000223#comment-16000223
 ] 

Ismael Juma commented on KAFKA-4982:


[~ecomar], are you planning to submit a PR for this soon? The KIP has enough 
votes so it's just a matter of time for it to pass and there isn't much time 
until the feature freeze.

> Add listener tag to socket-server-metrics.connection-... metrics  
> --
>
> Key: KAFKA-4982
> URL: https://issues.apache.org/jira/browse/KAFKA-4982
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
> Fix For: 0.11.0.0
>
>
> Metrics in socket-server-metrics like connection-count connection-close-rate 
> etc are tagged with networkProcessor:
> where the id of a network processor is just a numeric integer.
> If you have more than one listener (eg PLAINTEXT, SASL_SSL, etc.), the id 
> just keeps incrementing and when looking at the metrics it is hard to match 
> the metric tag to a listener. 
> You need to know the number of network threads and the order in which the 
> listeners are declared in the brokers' server.properties.
> We should add a tag showing the listener label, that would also make it much 
> easier to group the metrics in a tool like grafana



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[VOTE] KIP-133: List and Alter Configs Admin APIs

2017-05-07 Thread Ismael Juma
Hi everyone,

I believe I addressed the comments in the discussion thread and given the
impending KIP freeze, I would like to start the voting process for KIP-133:
List and Alter Configs Admin APIs:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
133%3A+List+and+Alter+Configs+Admin+APIs

As mentioned previously, this KIP and KIP-140 (Add administrative RPCs for
adding, deleting, and listing ACLs) complete the AdminClient work that was
originally proposed as part KIP-4.

If you have additional feedback, please share it in the discuss thread.

The vote will run for a minimum of 72 hours.

Thanks,
Ismael


[VOTE] KIP-133: List and Alter Configs Admin APIs (second attempt)

2017-05-07 Thread Ismael Juma
[Seems like the original message ended up in the discuss thread in GMail,
so trying again]

Hi everyone,

I believe I addressed the comments in the discussion thread and given the
impending KIP freeze, I would like to start the voting process for KIP-133:
List and Alter Configs Admin APIs:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%
3A+List+and+Alter+Configs+Admin+APIs

As mentioned previously, this KIP and KIP-140 (Add administrative RPCs for
adding, deleting, and listing ACLs) complete the AdminClient work that was
originally proposed as part KIP-4.

If you have additional feedback, please share it in the discuss thread.

The vote will run for a minimum of 72 hours.

Thanks,
Ismael


[jira] [Created] (KAFKA-5190) builds with low parallelism exhaust system open files and crash

2017-05-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-5190:
---

 Summary: builds with low parallelism exhaust system open files and 
crash
 Key: KAFKA-5190
 URL: https://issues.apache.org/jira/browse/KAFKA-5190
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt


in an attempt to produce more stable builds, i tried reducing the parallelism:
```
export GRADLE_OPTS=-Dorg.gradle.daemon=false
./gradlew clean build -PmaxParallelForks=1
```
which made things much worse - i now have builds that fail due to hitting the 
system maximum number of open files (4096 in my case).

this happens with or without the gradle daemon



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-153 (separating replication traffic from BytesOutPerSec metric)

2017-05-07 Thread Sriram Subramanian
+1

> On May 7, 2017, at 7:40 PM, Jun Rao  wrote:
> 
> Hi, Everyone,
> 
> Since this is a relatively simple change, I would like to start the voting
> process for KIP-153 : Include only client traffic in BytesOutPerSec metric.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-153+%
> 3A+Include+only+client+traffic+in+BytesOutPerSec+metric
> 
> The vote will run for a minimum of 72 hours.
> 
> Thanks,
> 
> Jun


Re: [VOTE] KIP-133: List and Alter Configs Admin APIs (second attempt)

2017-05-07 Thread Sriram Subramanian
+1

> On May 7, 2017, at 9:01 PM, Ismael Juma  wrote:
> 
> [Seems like the original message ended up in the discuss thread in GMail,
> so trying again]
> 
> Hi everyone,
> 
> I believe I addressed the comments in the discussion thread and given the
> impending KIP freeze, I would like to start the voting process for KIP-133:
> List and Alter Configs Admin APIs:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%
> 3A+List+and+Alter+Configs+Admin+APIs
> 
> As mentioned previously, this KIP and KIP-140 (Add administrative RPCs for
> adding, deleting, and listing ACLs) complete the AdminClient work that was
> originally proposed as part KIP-4.
> 
> If you have additional feedback, please share it in the discuss thread.
> 
> The vote will run for a minimum of 72 hours.
> 
> Thanks,
> Ismael


Re: [DISCUSS] KIP-133: List and Alter Configs Admin APIs

2017-05-07 Thread James Cheng
The KIP talks about allowing the user to provide a AlterConfigsPolicy. I assume 
that even if the user does not provide a custom policy, that there are some 
basic validation that the broker will do by default?

The reason I'm asking is I'm thinking ahead to 
https://issues.apache.org/jira/browse/KAFKA-4680, and wanted to know if there 
is going to be central place where we can do checks for stuff like that.

-James

> On May 4, 2017, at 10:32 PM, Ismael Juma  wrote:
> 
> Hi all,
> 
> We've posted "KIP-133: List and Alter Configs Admin APIs" for discussion:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+List+and+Alter+Configs+Admin+APIs
> 
> This completes the first batch of AdminClient APIs so that topic, config
> and ACL management is supported.
> 
> Please take a look. Your feedback is appreciated.
> 
> Thanks,
> Ismael



[jira] [Commented] (KAFKA-3480) Autogenerate metrics documentation

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000256#comment-16000256
 ] 

ASF GitHub Bot commented on KAFKA-3480:
---

Github user wushujames closed the pull request at:

https://github.com/apache/kafka/pull/1202


> Autogenerate metrics documentation
> --
>
> Key: KAFKA-3480
> URL: https://issues.apache.org/jira/browse/KAFKA-3480
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: James Cheng
> Attachments: sample_metrics.html, Screen Shot 2016-04-07 at 6.52.19 
> PM.png
>
>
> Metrics documentation is done manually, which means it's hard to keep it 
> current. It would be nice to have automatic generation of this documentation 
> as we have for configs.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #1202: KAFKA-3480: Autogenerate metrics documentation

2017-05-07 Thread wushujames
Github user wushujames closed the pull request at:

https://github.com/apache/kafka/pull/1202


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-07 Thread James Cheng (JIRA)
James Cheng created KAFKA-5191:
--

 Summary: Autogenerate Consumer Fetcher metrics
 Key: KAFKA-5191
 URL: https://issues.apache.org/jira/browse/KAFKA-5191
 Project: Kafka
  Issue Type: Sub-task
Reporter: James Cheng






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-07 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng reassigned KAFKA-5191:
--

Assignee: James Cheng

> Autogenerate Consumer Fetcher metrics
> -
>
> Key: KAFKA-5191
> URL: https://issues.apache.org/jira/browse/KAFKA-5191
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-07 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5191 started by James Cheng.
--
> Autogenerate Consumer Fetcher metrics
> -
>
> Key: KAFKA-5191
> URL: https://issues.apache.org/jira/browse/KAFKA-5191
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101

2017-05-07 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000261#comment-16000261
 ] 

Onur Karaman commented on KAFKA-5099:
-

By the way, I reran the file descriptor experiment against both my patch as 
well as against git hash b611cfa5c0f4941a491781424afd9b699bdb894e (the commit 
before KIP-101: 0baea2ac13532981f3fea11e5dfc6da5aafaeaa8) on both mac and 
linux. The file descriptors stick around while the index files have been 
removed for over 10 minutes so I gave up waiting.

> Replica Deletion Regression from KIP-101
> 
>
> Key: KAFKA-5099
> URL: https://issues.apache.org/jira/browse/KAFKA-5099
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> It appears that replica deletion regressed from KIP-101. Replica deletion 
> happens when a broker receives a StopReplicaRequest with delete=true. Ever 
> since KAFKA-1911, replica deletion has been async, meaning the broker 
> responds with a StopReplicaResponse simply after marking the replica 
> directory as staged for deletion. This marking happens by moving a data log 
> directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked 
> directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, 
> acting as a soft-delete. A scheduled thread later actually deletes the data. 
> It appears that the regression occurs while the scheduled thread is actually 
> trying to delete the data, which means the controller considers operations 
> such as partition reassignment and topic deletion complete. But if you look 
> at the log4j logs and data logs, you'll find that the soft-deleted data logs 
> haven't actually won't get deleted. It seems that restarting the broker 
> actually allows for the soft-deleted directories to get deleted.
> Here's the setup:
> {code}
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh 
> > config/server1.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0
> > cat p.txt
> {"partitions":
>  [
>   {"topic": "t1", "partition": 0, "replicas": [0] }
>  ],
> "version":1
> }
> > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> > --reassignment-json-file p.txt --execute
> {code}
> Here are sample logs:
> {code}
> [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager)
> [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is 
> scheduled for deletion (kafka.log.LogManager)
> [2017-04-20 17:47:27,585] INFO Deleting index 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index
>  (kafka.log.OffsetIndex)
> [2017-04-20 17:47:27,586] INFO Deleting index 
> /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex)
> [2017-04-20 17:47:27,587] ERROR Exception in deleting 
> Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it 
> to the end of the queue. (kafka.log.LogManager)
> java.io.FileNotFoundException: 
> /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory)
>   at java.io.FileOutputStream.open0(Native Method)
>   at java.io.FileOutputStream.open(FileOutputStream.java:270)
>   at java.io.FileOutputStream.(FileOutputStream.java:213)
>   at java.io.FileOutputStream.(FileOutputStream.java:162)
>   at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41)
>   at 
> kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61)
>   at 
> kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at 
> kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159)
>   at kafka.log.Log.delete(Log.scala:1051)
>   at 
> kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(

[jira] [Resolved] (KAFKA-5172) CachingSessionStore doesn't fetchPrevious correctly.

2017-05-07 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5172.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.0

Issue resolved by pull request 2972
[https://github.com/apache/kafka/pull/2972]

> CachingSessionStore doesn't fetchPrevious correctly.
> 
>
> Key: KAFKA-5172
> URL: https://issues.apache.org/jira/browse/KAFKA-5172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
> Fix For: 0.11.0.0
>
>
> When using KStreamSessionWindowAggregate by calling 
> KGroupedStream#aggregate() a CachingSessionStore is created.
> This causes the following chain of method calls when a new record that 
> requires removing others from the store appear:
> KStreamSessionWindowAggregate
> CachingSessionStore.remove(Windowed)
> CachingSessionStore.put(Windowed, V)
> ThreadCache.put(String, Bytes *containing Windowed info*, LRUCacheEntry)
> ThreadCache.maybeEvict(String)
> NamedCache.evict()
> NamedCache.flush(LRUNode *containing Bytes and LRUCacheEntry from 
> ThreadCache#put*)
> DirtyEntryFlushListener *defined in CachingSessionStore line 80* 
> .apply(ThreadCache.DirtyEntry *containing Bytes and LRUCacheEntry from 
> ThreadCache#put*)
> CachingSessionStore.putAndMaybeForward(ThreadCache.DirtyEntry *containing 
> Bytes and LRUCacheEntry from ThreadCache#put*, InternalProcessorContext)
> CachingSessionStore.fetchPrevious(Bytes *containing Windowed info*)
> RocksDBSessionStore.fetch(Bytes *containing Windowed info*)
> RocksDBSessionStore.findSessions *on line 48* (Bytes *containing Windowed 
> info*, 0, Long.MAX_VALUE)
> MeteredSegmentedByteStore.fetch(Bytes *containing Windowed info*, 0, 
> Long.MAX_VALUE)
> ChangeLoggingSegmentedByteStore.fetch(Bytes *containing Windowed info*, 0, 
> Long.MAX_VALUE)
> RocksDBSegmentedBytesStore.fetch(Bytes *containing Windowed info*, 0, 
> Long.MAX_VALUE)
> SessionKeySchema.lower/upperRange(Bytes *containing Windowed info*, Long)
> ** in this method the already Windowed gets Windowed again *
> The point of showing all this is to point out that the windowed gets windowed 
> and because it passes the 0, Long.MAX_VALUE it searches for a strange key and 
> searches all times for it. I think the fetchPrevious method of 
> CachingSessionStore should be changed to call the 
> byteStores.findSessions(Bytes.wrap(serdes.rawKey(key.key())), 
> key.window().start(), key.window().end()). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2972: KAFKA-5172: Fix fetchPrevious to find the correct ...

2017-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2972


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5172) CachingSessionStore doesn't fetchPrevious correctly.

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000278#comment-16000278
 ] 

ASF GitHub Bot commented on KAFKA-5172:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2972


> CachingSessionStore doesn't fetchPrevious correctly.
> 
>
> Key: KAFKA-5172
> URL: https://issues.apache.org/jira/browse/KAFKA-5172
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Kyle Winkelman
>Assignee: Kyle Winkelman
> Fix For: 0.11.0.0
>
>
> When using KStreamSessionWindowAggregate by calling 
> KGroupedStream#aggregate() a CachingSessionStore is created.
> This causes the following chain of method calls when a new record that 
> requires removing others from the store appear:
> KStreamSessionWindowAggregate
> CachingSessionStore.remove(Windowed)
> CachingSessionStore.put(Windowed, V)
> ThreadCache.put(String, Bytes *containing Windowed info*, LRUCacheEntry)
> ThreadCache.maybeEvict(String)
> NamedCache.evict()
> NamedCache.flush(LRUNode *containing Bytes and LRUCacheEntry from 
> ThreadCache#put*)
> DirtyEntryFlushListener *defined in CachingSessionStore line 80* 
> .apply(ThreadCache.DirtyEntry *containing Bytes and LRUCacheEntry from 
> ThreadCache#put*)
> CachingSessionStore.putAndMaybeForward(ThreadCache.DirtyEntry *containing 
> Bytes and LRUCacheEntry from ThreadCache#put*, InternalProcessorContext)
> CachingSessionStore.fetchPrevious(Bytes *containing Windowed info*)
> RocksDBSessionStore.fetch(Bytes *containing Windowed info*)
> RocksDBSessionStore.findSessions *on line 48* (Bytes *containing Windowed 
> info*, 0, Long.MAX_VALUE)
> MeteredSegmentedByteStore.fetch(Bytes *containing Windowed info*, 0, 
> Long.MAX_VALUE)
> ChangeLoggingSegmentedByteStore.fetch(Bytes *containing Windowed info*, 0, 
> Long.MAX_VALUE)
> RocksDBSegmentedBytesStore.fetch(Bytes *containing Windowed info*, 0, 
> Long.MAX_VALUE)
> SessionKeySchema.lower/upperRange(Bytes *containing Windowed info*, Long)
> ** in this method the already Windowed gets Windowed again *
> The point of showing all this is to point out that the windowed gets windowed 
> and because it passes the 0, Long.MAX_VALUE it searches for a strange key and 
> searches all times for it. I think the fetchPrevious method of 
> CachingSessionStore should be changed to call the 
> byteStores.findSessions(Bytes.wrap(serdes.rawKey(key.key())), 
> key.window().start(), key.window().end()). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2993: KAFKA-5191: Autogenerate Consumer Fetcher metrics

2017-05-07 Thread wushujames
GitHub user wushujames opened a pull request:

https://github.com/apache/kafka/pull/2993

KAFKA-5191: Autogenerate Consumer Fetcher metrics

Autogenerate docs for the Consumer Fetcher's metrics. This is a smaller 
subset of the original PR https://github.com/apache/kafka/pull/1202.

CC @ijuma @benstopford @hachikuji 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wushujames/kafka fetcher_metrics_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2993.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2993


commit edcfde98d50b6cb46a0eaf19d61ad5f19dd8fd8b
Author: James Cheng 
Date:   2017-05-07T05:33:48Z

Base files, to support auto-generating metrics documentation.

commit 9c0becef44e36f69dd3a141e4d2bce4694eb29b7
Author: James Cheng 
Date:   2017-05-07T06:46:54Z

Autogenerate metrics docs for the Fetcher




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000279#comment-16000279
 ] 

ASF GitHub Bot commented on KAFKA-5191:
---

GitHub user wushujames opened a pull request:

https://github.com/apache/kafka/pull/2993

KAFKA-5191: Autogenerate Consumer Fetcher metrics

Autogenerate docs for the Consumer Fetcher's metrics. This is a smaller 
subset of the original PR https://github.com/apache/kafka/pull/1202.

CC @ijuma @benstopford @hachikuji 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wushujames/kafka fetcher_metrics_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2993.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2993


commit edcfde98d50b6cb46a0eaf19d61ad5f19dd8fd8b
Author: James Cheng 
Date:   2017-05-07T05:33:48Z

Base files, to support auto-generating metrics documentation.

commit 9c0becef44e36f69dd3a141e4d2bce4694eb29b7
Author: James Cheng 
Date:   2017-05-07T06:46:54Z

Autogenerate metrics docs for the Fetcher




> Autogenerate Consumer Fetcher metrics
> -
>
> Key: KAFKA-5191
> URL: https://issues.apache.org/jira/browse/KAFKA-5191
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-07 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng updated KAFKA-5191:
---
Reviewer: Ismael Juma
  Status: Patch Available  (was: In Progress)

> Autogenerate Consumer Fetcher metrics
> -
>
> Key: KAFKA-5191
> URL: https://issues.apache.org/jira/browse/KAFKA-5191
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5191) Autogenerate Consumer Fetcher metrics

2017-05-07 Thread James Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng updated KAFKA-5191:
---
Attachment: generated_fetcher_docs.png

> Autogenerate Consumer Fetcher metrics
> -
>
> Key: KAFKA-5191
> URL: https://issues.apache.org/jira/browse/KAFKA-5191
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
> Attachments: generated_fetcher_docs.png
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk8 #1490

2017-05-07 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-5172: Fix fetchPrevious to find the correct session

--
[...truncated 857.71 KB...]
kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 
testMetricsDuringTopicCreateDelete STARTED

kafka.integration.MetricsDuringTopicCreationDeletionTest > 
testMetricsDuringTopicCreateDelete PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integratio