Re: [DISCUSS] KIP-544: Make metrics exposed via JMX configurable

2019-11-05 Thread Viktor Somogyi-Vass
Hi Xavier,

That's certainly an option, however it does not solve the problem for our
users that still rely on JMX integration to collect metrics.

Absolutely.

We already provide the ability to write reporter plugins via the
MetricsReporter interface.
And rather than building a separate HTTP interface, I think we should
extend the MetricsReporter interface to also
provide access to yammer metrics – not just Kafka metrics – since there is
no clear effort to move away from Yammer at this time.

This way one could build any kind of reporter – HTTP or otherwise – without
having to rely on Kafka internal classes

Yes, as you point it out it's important decouple the metric reporter from
internal classes and for this exposing Yammer would be a good step.
>From this perspective the REST API goes one step further as you won't have
to ship the broker and the reporter plugin together.
Anyway, don't want to derail the conversation here with the REST stuff
(perhaps I'll open a KIP for that sometime and we can discuss it there :) ).

Thanks,
Viktor


On Wed, Oct 30, 2019 at 10:44 PM Xavier Léauté  wrote:

> >
> > A follow-up question, maybe to list in the future work section as it's
> > somewhat parallel to this KIP: have you thought about implementing a REST
> > reporter for metrics?
>
>
> That's certainly an option, however it does not solve the problem for our
> users that still rely on JMX integration to collect metrics.
>
> We already provide the ability to write reporter plugins via the
> MetricsReporter interface.
> And rather than building a separate HTTP interface, I think we should
> extend the MetricsReporter interface to also
> provide access to yammer metrics – not just Kafka metrics – since there is
> no clear effort to move away from Yammer at this time.
>
> This way one could build any kind of reporter – HTTP or otherwise – without
> having to rely on Kafka internal classes
>


Re: [VOTE] KIP-523 Add KStream#toTable to the Streams DSL

2019-11-05 Thread aishwarya kumar
Thank you so much for the votes.

I will consider that the KIP is accepted, with 3 binding votes from
Matthias, Bill and Guozhang.

And 1 non-binding vote from John.


Best,
Aishwarya


On Mon, Nov 4, 2019, 12:18 PM Guozhang Wang  wrote:

> +1 (binding), thanks Aishwarya!
>
> On Sun, Nov 3, 2019 at 11:46 AM aishwarya kumar 
> wrote:
>
> > This thread has been open for more than 72 hours. So far there are 2
> > binding and 1 non-binding votes, looking to conclude this quickly!!
> >
> > Best,
> > Aishwarya
> >
> > On Mon, Oct 28, 2019 at 5:00 PM John Roesler  wrote:
> >
> > > Thanks, Aishwarya!
> > >
> > > I'm +1 (non-binding)
> > >
> > > -John
> > >
> > > On Mon, Oct 28, 2019 at 11:58 AM aishwarya kumar 
> > > wrote:
> > > >
> > > > Thank you,
> > > >
> > > > Two binding votes so far.
> > > >
> > > > I'll keep this thread open for a couple of days.
> > > >
> > > > Best,
> > > > Aishwarya
> > > >
> > > > On Thu, Oct 24, 2019, 3:05 PM Bill Bejeck  wrote:
> > > >
> > > > > Thanks for the KIP, this is something that will be appreciated by
> the
> > > > > community.
> > > > >
> > > > > +1(binding)
> > > > >
> > > > > -Bill
> > > > >
> > > > > On Thu, Oct 24, 2019 at 12:54 PM Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 10/24/19 6:19 AM, aishwarya kumar wrote:
> > > > > > > Hello All,
> > > > > > >
> > > > > > > After concluding discussions for this KIP, I would like to go
> > > forward
> > > > > > with
> > > > > > > the voting process.
> > > > > > >
> > > > > > > Jira Ticket: https://issues.apache.org/jira/browse/KAFKA-7658
> > > > > > > KIP :
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
> > > > > > >
> > > > > > > Thank you,
> > > > > > > Aishwarya
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: Subject: [VOTE] 2.2.2 RC2

2019-11-05 Thread Mickael Maison
+1 (non binding)
I verified signatures, built it from source, ran unit tests and quickstart



On Fri, Oct 25, 2019 at 3:10 PM Randall Hauch  wrote:
>
> Hello all, we identified around three dozen bug fixes, including an update
> of a third party dependency, and wanted to release a patch release for the
> Apache Kafka 2.2.0 release.
>
> This is the *second* candidate for release of Apache Kafka 2.2.2. (RC1 did
> not include a fix for https://issues.apache.org/jira/browse/KAFKA-9053, but
> the fix appeared before RC1 was announced so it was easier to just create
> RC2.)
>
> Check out the release notes for a complete list of the changes in this
> release candidate:
> https://home.apache.org/~rhauch/kafka-2.2.2-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Wednesday, October 30, 9am PT>
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~rhauch/kafka-2.2.2-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~rhauch/kafka-2.2.2-rc2/javadoc/
>
> * Tag to be voted upon (off 2.2 branch) is the 2.2.2 tag:
> https://github.com/apache/kafka/releases/tag/2.2.2-rc2
>
> * Documentation:
> https://kafka.apache.org/22/documentation.html
>
> * Protocol:
> https://kafka.apache.org/22/protocol.html
>
> * Successful Jenkins builds for the 2.2 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/1/
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka/job/2.2/216/
>
> /**
>
> Thanks,
>
> Randall Hauch


Re: [DISCUSS] KIP-536: Propagate broker timestamp to Admin API

2019-11-05 Thread Noa Resare
It would certainly be possible to have the field be optional and only include 
it if some flag is set in the DescribeClusterOptions instance passed to 
Admin.describeCluster() that in turn would translate to a boolean in 
MetadataRequest indicating that we are asking for this piece of information.

I’m not entirely sure that this extra complexity would be worth it for the 
modestly smaller response size, in a message that already contains things like 
the hostname and rack identifier per broker.

/noa

> On 4 Nov 2019, at 14:45, Gwen Shapira  wrote:
> 
> Cluster metadata is sent to clients on a very regular basis. Adding
> start-time there seems quite repetitive. Especially considering that
> this information is only useful in very specific cases.
> 
> Can we add this capability to the Admin API in a way that won't impact
> normal client workflow?
> 
> On Mon, Nov 4, 2019 at 4:05 AM Noa Resare  wrote:
>> 
>> Thank you for the feedback, Stanislav!
>> 
>> I agree that it would be good to harmonise the naming, and start-time-ms 
>> definitely more descriptive.
>> 
>> I have updated the proposal to reflect this, and also added the updated json 
>> RPC changes. Please have a look.
>> 
>> /noa
>> 
>>> On 1 Nov 2019, at 09:13, Stanislav Kozlovski  wrote:
>>> 
>>> Hey Noa,
>>> 
>>> KIP-436 added a JMX metric in Kafka for this exact use-case, called
>>> `start-time-ms`. Perhaps it would be useful to name this public interface
>>> in the same way for consistency.
>>> 
>>> Could you update the KIP to include the specific RPC changes regarding the
>>> metadata request/responses? Here is a recent example of how to portray the
>>> changes -
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response
>>> 
>>> Thanks,
>>> Stanislav!
>>> 
>>> On Mon, Oct 14, 2019 at 2:46 PM Noa Resare  wrote:
>>> 
 We are in the process of migrating the pieces of automation that currently
 reads and modifies zookeeper state to use the Admin API.
 
 One of the things that we miss doing this is access to the start time of
 brokers in a cluster which is used by our automation doing rolling
 restarts. We currently read this from the timestamp field from the
 epehmeral broker znodes in zookeeper. To address this limitation, I have
 authored KIP-536, that proposes adding a timestamp field to the Node class
 that the AdminClient.describeCluster() method returns.
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-536%3A+Propagate+broker+timestamp+to+Admin+API
 <
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-536:+Propagate+broker+timestamp+to+Admin+API
> 
 
 Any and all feedback is most welcome
 
 cheers
 noa
>>> 
>>> 
>>> 
>>> --
>>> Best,
>>> Stanislav
>> 



Re: [DISCUSS] KIP-536: Propagate broker timestamp to Admin API

2019-11-05 Thread Gwen Shapira
It isn't just about saving space. It increases complexity to default to
always sharing a bit of information that is really only needed in a single
use-case.
We avoid doing this as a matter of good protocol design.
Arguably, this should not really piggyback on cluster metadata at all,
since the usage is so different.

On Tue, Nov 5, 2019, 7:29 AM Noa Resare  wrote:

> It would certainly be possible to have the field be optional and only
> include it if some flag is set in the DescribeClusterOptions instance
> passed to Admin.describeCluster() that in turn would translate to a boolean
> in MetadataRequest indicating that we are asking for this piece of
> information.
>
> I’m not entirely sure that this extra complexity would be worth it for the
> modestly smaller response size, in a message that already contains things
> like the hostname and rack identifier per broker.
>
> /noa
>
> > On 4 Nov 2019, at 14:45, Gwen Shapira  wrote:
> >
> > Cluster metadata is sent to clients on a very regular basis. Adding
> > start-time there seems quite repetitive. Especially considering that
> > this information is only useful in very specific cases.
> >
> > Can we add this capability to the Admin API in a way that won't impact
> > normal client workflow?
> >
> > On Mon, Nov 4, 2019 at 4:05 AM Noa Resare  wrote:
> >>
> >> Thank you for the feedback, Stanislav!
> >>
> >> I agree that it would be good to harmonise the naming, and
> start-time-ms definitely more descriptive.
> >>
> >> I have updated the proposal to reflect this, and also added the updated
> json RPC changes. Please have a look.
> >>
> >> /noa
> >>
> >>> On 1 Nov 2019, at 09:13, Stanislav Kozlovski 
> wrote:
> >>>
> >>> Hey Noa,
> >>>
> >>> KIP-436 added a JMX metric in Kafka for this exact use-case, called
> >>> `start-time-ms`. Perhaps it would be useful to name this public
> interface
> >>> in the same way for consistency.
> >>>
> >>> Could you update the KIP to include the specific RPC changes regarding
> the
> >>> metadata request/responses? Here is a recent example of how to portray
> the
> >>> changes -
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response
> >>>
> >>> Thanks,
> >>> Stanislav!
> >>>
> >>> On Mon, Oct 14, 2019 at 2:46 PM Noa Resare  wrote:
> >>>
>  We are in the process of migrating the pieces of automation that
> currently
>  reads and modifies zookeeper state to use the Admin API.
> 
>  One of the things that we miss doing this is access to the start time
> of
>  brokers in a cluster which is used by our automation doing rolling
>  restarts. We currently read this from the timestamp field from the
>  epehmeral broker znodes in zookeeper. To address this limitation, I
> have
>  authored KIP-536, that proposes adding a timestamp field to the Node
> class
>  that the AdminClient.describeCluster() method returns.
> 
> 
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536%3A+Propagate+broker+timestamp+to+Admin+API
>  <
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536:+Propagate+broker+timestamp+to+Admin+API
> >
> 
>  Any and all feedback is most welcome
> 
>  cheers
>  noa
> >>>
> >>>
> >>>
> >>> --
> >>> Best,
> >>> Stanislav
> >>
>
>


Re: Subject: [VOTE] 2.2.2 RC2

2019-11-05 Thread Randall Hauch
Thanks, Mickael!

Anyone else get a chance to validate the 2.2.2 RC2 build? It'd be great to
get this out the door.

Randall

On Tue, Nov 5, 2019 at 6:34 AM Mickael Maison 
wrote:

> +1 (non binding)
> I verified signatures, built it from source, ran unit tests and quickstart
>
>
>
> On Fri, Oct 25, 2019 at 3:10 PM Randall Hauch  wrote:
> >
> > Hello all, we identified around three dozen bug fixes, including an
> update
> > of a third party dependency, and wanted to release a patch release for
> the
> > Apache Kafka 2.2.0 release.
> >
> > This is the *second* candidate for release of Apache Kafka 2.2.2. (RC1
> did
> > not include a fix for https://issues.apache.org/jira/browse/KAFKA-9053,
> but
> > the fix appeared before RC1 was announced so it was easier to just create
> > RC2.)
> >
> > Check out the release notes for a complete list of the changes in this
> > release candidate:
> > https://home.apache.org/~rhauch/kafka-2.2.2-rc2/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Wednesday, October 30, 9am PT>
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~rhauch/kafka-2.2.2-rc2/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~rhauch/kafka-2.2.2-rc2/javadoc/
> >
> > * Tag to be voted upon (off 2.2 branch) is the 2.2.2 tag:
> > https://github.com/apache/kafka/releases/tag/2.2.2-rc2
> >
> > * Documentation:
> > https://kafka.apache.org/22/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/22/protocol.html
> >
> > * Successful Jenkins builds for the 2.2 branch:
> > Unit/integration tests: https://builds.apache.org/job/kafka-2.2-jdk8/1/
> > System tests:
> > https://jenkins.confluent.io/job/system-test-kafka/job/2.2/216/
> >
> > /**
> >
> > Thanks,
> >
> > Randall Hauch
>


Re: [DISCUSS] KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum

2019-11-05 Thread Elmahdi FRID
Hello Folks any status abbout this kip and it's possible to test this use case ?

On 2019/08/01 21:04:46, "Colin McCabe"  wrote: 
> Hi all,
> 
> I've written a KIP about removing ZooKeeper from Kafka.  Please take a look 
> and let me know what you think:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
> 
> cheers,
> Colin
> 


[jira] [Created] (KAFKA-9143) DistributedHerder misleadingly log error on connector task reconfiguration

2019-11-05 Thread Ivan Yurchenko (Jira)
Ivan Yurchenko created KAFKA-9143:
-

 Summary: DistributedHerder misleadingly log error on connector 
task reconfiguration
 Key: KAFKA-9143
 URL: https://issues.apache.org/jira/browse/KAFKA-9143
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Ivan Yurchenko
Assignee: Ivan Yurchenko


In {{DistributedHerder}} in {{reconfigureConnectorTasksWithRetry}} method 
there's a 
[callback|https://github.com/apache/kafka/blob/c552c06aed50b4d4d9a85f73ccc89bc06fa7e094/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1247]:

{code:java}
@Override
public void onCompletion(Throwable error, Void result) {
log.error("Unexpected error during connector task reconfiguration: ", 
error);
log.error("Task reconfiguration for {} failed unexpectedly, this connector 
will not be properly reconfigured unless manually triggered.", connName);
}
{code}

It an error message even when the operation succeeded (i.e., {{error}} is 
{{null}}).

It should include {{if (error != null)}} condition, like in the same class [in 
another 
method|https://github.com/apache/kafka/blob/c552c06aed50b4d4d9a85f73ccc89bc06fa7e094/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L792].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9144) Early expiration of producer state can cause coordinator epoch to regress

2019-11-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9144:
--

 Summary: Early expiration of producer state can cause coordinator 
epoch to regress
 Key: KAFKA-9144
 URL: https://issues.apache.org/jira/browse/KAFKA-9144
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Transaction markers are written by the transaction coordinator. In order to 
fence zombie coordinators, we use the leader epoch associated with the 
coordinator partition. Partition leaders verify the epoch in the WriteTxnMarker 
request and ensure that it can only increase. However, when producer state 
expires, we stop tracking the epoch and it is possible for monotonicity to be 
violated. Generally we expect expiration to be on the order of days, so it 
should be unlikely for this to be a problem.

At least that is the theory. We observed a case where a coordinator epoch 
decreased between nearly consecutive writes within a couple minutes of each 
other. Upon investigation, we found that producer state had been incorrectly 
expired. We believe the sequence of events is the following:
 # Producer writes transactional data and fails before committing
 # Coordinator times out the transaction and writes ABORT markers
 # Upon seeing the ABORT and the bumped epoch, the partition leader deletes 
state from the last epoch, which effectively resets the last timestamp for the 
producer to -1.
 # The coordinator becomes a zombie before getting a successful response and 
continues trying to send
 # The new coordinator notices the incomplete transaction and also sends markers
 # The partition leader accepts the write from the new coordinator
 # The producer state is expired because the last timestamp was -1
 # The partition leader accepts the write from the old coordinator

Basically it takes an alignment of planets to hit this bug, but it is possible. 
If you hit it, then the broker may be unable to start because we validate epoch 
monotonicity during log recovery. The problem is in 3 when the timestamp gets 
reset. We should use the timestamp from the marker instead.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


RE: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-05 Thread Senthilnathan Muthusamy
Thanks for pointing it out Eric. Updated the KIP...

Regards,
Senthil

-Original Message-
From: Guozhang Wang  
Sent: Monday, November 4, 2019 11:52 AM
To: dev 
Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction

Eric,

I think that's a good point, in `Headers.java` we also designed the API to only 
have `Header lastHeader(String key);`. I think picking the last header for that 
key would makes more sense since internally it is organized as a list such that 
newer headers could consider "overwriting" the older headers.


Guozhang

On Mon, Nov 4, 2019 at 11:31 AM Eric Azama  wrote:

> Hi Senthilnathan,
>
> Regarding Matthias's point 6, what is the reasoning for choosing the 
> first occurrence of the configured header? I believe this corresponds 
> to the oldest value for given key. If there are multiple values for a 
> key, it seems more intuitive that the newest value is the one that 
> should be used for compaction.
>
> Thanks,
> Eric
>
> On Mon, Nov 4, 2019 at 11:00 AM Guozhang Wang  wrote:
>
> > Hello Senthilnathan,
> >
> > Thanks for revamping on the KIP. I have only one comment about the 
> > wiki otherwise LGTM.
> >
> > 1. We should emphasize that the newly introduced config yields to 
> > the existing "log.cleanup.policy", i.e. if the latter's value is 
> > `delete` not `compact`, then the previous config would be ignored.
> >
> >
> > Guozhang
> >
> > On Mon, Nov 4, 2019 at 9:52 AM Senthilnathan Muthusamy 
> >  wrote:
> >
> > > Hi all,
> > >
> > > I will start the vote thread shortly for this updated KIP. If 
> > > there are any more thoughts I would love to hear them.
> > >
> > > Thanks,
> > > Senthil
> > >
> > > -Original Message-
> > > From: Senthilnathan Muthusamy 
> > > Sent: Thursday, October 31, 2019 3:51 AM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> > >
> > > Hi Matthias
> > >
> > > Thanks for the response.
> > >
> > > (1) Yes
> > >
> > > (2) Yes, and the config name will be the same (i.e.
> > > `log.cleaner.compaction.strategy` &
> > > `log.cleaner.compaction.strategy.header`) at broker level and 
> > > topic
> level
> > > (to override broker level default compact strategy). Please let me 
> > > know
> > if
> > > we need to keep it in different naming convention. Note: Broker 
> > > level (which will be in the server.properties) configuration is 
> > > optional and default it to offset. Topic level configuration will 
> > > be default to
> broker
> > > level config...
> > >
> > > (3) By this new way, it avoids another config parameter and also 
> > > in feature if any new strategy like header need addition info, no
> additional
> > > config required. As this got discussed already and agreed to have
> > separate
> > > config, I will revert it. KIP updated...
> > >
> > > (4) Done
> > >
> > > (5) Updated
> > >
> > > (6) Updated to pick the first header in the list
> > >
> > > Please let me know if you have any other questions.
> > >
> > > Thanks,
> > > Senthil
> > >
> > > -Original Message-
> > > From: Matthias J. Sax 
> > > Sent: Thursday, October 31, 2019 12:13 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> > >
> > > Thanks for picking up this KIP, Senthil.
> > >
> > > (1) As far as I remember, the main issue of the original proposal 
> > > was a missing topic level configuration for the compaction 
> > > strategy. With
> this
> > > being addressed, I am in favor of this KIP.
> > >
> > > (2) With regard to (1), it seems we would need a new topic level 
> > > config `compaction.strategy`, and 
> > > `log.cleaner.compaction.strategy` would be
> the
> > > default strategy (ie, broker level config) if a topic does not
> overwrite
> > it?
> > >
> > > (3) Why did you remove `log.cleaner.compaction.strategy.header`
> > > parameter and change the accepted values of 
> > > `log.cleaner.compaction.strategy` to "header." instead of 
> > > keeping "header"? The original approach seems to be cleaner, and I 
> > > think this
> was
> > > discussed on the original discuss thread already.
> > >
> > > (4) Nit: For the "timestamp" compaction strategy you changed the 
> > > KIP to
> > >
> > > -> `The record [create] timestamp`
> > >
> > > This is miss leading IMHO, because it depends on the broker/log 
> > > configuration `(log.)message.timestamp.type` that can either be 
> > > `CreateTime` or `LogAppendTime` what the actual record timestamp 
> > > is. I would just remove "create" to keep it unspecified.
> > >
> > > (5) Nit: the section "Public Interfaces" should list the newly
> introduced
> > > configs -- configuration parameters are a public interface.
> > >
> > > (6) What do you mean by "first level header lookup"? The term 
> > > "first level" indicates some hierarchy, but headers don't have any 
> > > hierarchy
> --
> > > it's just a list of key-value pairs? If you mean the _order_ of 
> > > the headers, ie, pick the first header in the list that matches 
> > > the key,
> > 

RE: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-05 Thread Senthilnathan Muthusamy
Hi Guozhang,

Sure and I have made a note in the JIRA item to make sure the wiki is updated.

Thanks,
Senthil

-Original Message-
From: Guozhang Wang  
Sent: Monday, November 4, 2019 11:00 AM
To: dev 
Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction

Hello Senthilnathan,

Thanks for revamping on the KIP. I have only one comment about the wiki 
otherwise LGTM.

1. We should emphasize that the newly introduced config yields to the existing 
"log.cleanup.policy", i.e. if the latter's value is `delete` not `compact`, 
then the previous config would be ignored.


Guozhang

On Mon, Nov 4, 2019 at 9:52 AM Senthilnathan Muthusamy 
 wrote:

> Hi all,
>
> I will start the vote thread shortly for this updated KIP. If there 
> are any more thoughts I would love to hear them.
>
> Thanks,
> Senthil
>
> -Original Message-
> From: Senthilnathan Muthusamy 
> Sent: Thursday, October 31, 2019 3:51 AM
> To: dev@kafka.apache.org
> Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
>
> Hi Matthias
>
> Thanks for the response.
>
> (1) Yes
>
> (2) Yes, and the config name will be the same (i.e.
> `log.cleaner.compaction.strategy` &
> `log.cleaner.compaction.strategy.header`) at broker level and topic 
> level (to override broker level default compact strategy). Please let 
> me know if we need to keep it in different naming convention. Note: 
> Broker level (which will be in the server.properties) configuration is 
> optional and default it to offset. Topic level configuration will be 
> default to broker level config...
>
> (3) By this new way, it avoids another config parameter and also in 
> feature if any new strategy like header need addition info, no 
> additional config required. As this got discussed already and agreed 
> to have separate config, I will revert it. KIP updated...
>
> (4) Done
>
> (5) Updated
>
> (6) Updated to pick the first header in the list
>
> Please let me know if you have any other questions.
>
> Thanks,
> Senthil
>
> -Original Message-
> From: Matthias J. Sax 
> Sent: Thursday, October 31, 2019 12:13 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
>
> Thanks for picking up this KIP, Senthil.
>
> (1) As far as I remember, the main issue of the original proposal was 
> a missing topic level configuration for the compaction strategy. With 
> this being addressed, I am in favor of this KIP.
>
> (2) With regard to (1), it seems we would need a new topic level 
> config `compaction.strategy`, and `log.cleaner.compaction.strategy` 
> would be the default strategy (ie, broker level config) if a topic does not 
> overwrite it?
>
> (3) Why did you remove `log.cleaner.compaction.strategy.header`
> parameter and change the accepted values of 
> `log.cleaner.compaction.strategy` to "header." instead of keeping 
> "header"? The original approach seems to be cleaner, and I think this 
> was discussed on the original discuss thread already.
>
> (4) Nit: For the "timestamp" compaction strategy you changed the KIP 
> to
>
> -> `The record [create] timestamp`
>
> This is miss leading IMHO, because it depends on the broker/log 
> configuration `(log.)message.timestamp.type` that can either be 
> `CreateTime` or `LogAppendTime` what the actual record timestamp is. I 
> would just remove "create" to keep it unspecified.
>
> (5) Nit: the section "Public Interfaces" should list the newly 
> introduced configs -- configuration parameters are a public interface.
>
> (6) What do you mean by "first level header lookup"? The term "first 
> level" indicates some hierarchy, but headers don't have any hierarchy 
> -- it's just a list of key-value pairs? If you mean the _order_ of the 
> headers, ie, pick the first header in the list that matches the key, 
> please rephrase it to make it clearer.
>
>
>
> @Tom: I agree with all you are saying, however, I still think that 
> this KIP will improve the overall situation, because everything you 
> pointed out is actually true with offset based compaction, too.
>
> The KIP is not a silver bullet that solves all issue for interleaved 
> writes, but I personally believe, it's a good improvement.
>
>
>
> -Matthias
>
>
> On 10/30/19 9:45 AM, Senthilnathan Muthusamy wrote:
> > Hi,
> >
> > Please let me know if anyone has any questions on this updated KIP-280...
> >
> > Thanks,
> >
> > Senthil
> >
> > -Original Message-
> > From: Senthilnathan Muthusamy 
> > Sent: Monday, October 28, 2019 11:36 PM
> > To: dev@kafka.apache.org
> > Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
> >
> > Hi Tom,
> >
> > Sorry for the delayed response.
> >
> > Regarding the fall back to offset decision for both timestamp & 
> > header
> value is based on the previous author discuss
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Flist
> s.apache.org%2Fthread.html%2Ff44317eb6cd34f91966654c80509d4a457dbbccdd
> 02b86645782be67%40%253Cdev.kafka.apache.org%253E&data=02%7C01%7Cse
> nthilm%40microsoft.com%7C8f6c

RE: [VOTE] KIP-280: Enhanced log compaction

2019-11-05 Thread Senthilnathan Muthusamy
Thanks Gouzhang and I have made a note in the JIRA item to update the wiki.

Till now got 1 +1 binding... waiting for 2 more +1 binding... thnx!

Regards,
Senthil

-Original Message-
From: Guozhang Wang  
Sent: Monday, November 4, 2019 11:01 AM
To: dev 
Subject: Re: [VOTE] KIP-280: Enhanced log compaction

I only have one minor comment on the DISCUSS thread, otherwise I'm +1 (binding).

On Mon, Nov 4, 2019 at 9:53 AM Senthilnathan Muthusamy 
 wrote:

> Hi all,
>
> I would like to start the vote on the updated KIP-280: Enhanced log 
> compaction. Thanks to Guozhang, Matthias & Tom for the valuable 
> feedback on the discussion thread...
>
> KIP:
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwik
> i.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-280%253A%2BEnhanced%
> 2Blog%2Bcompaction&data=02%7C01%7Csenthilm%40microsoft.com%7Ca8ca2
> 5d3f1894d0d271f08d7615966d3%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0
> %7C637085005478393331&sdata=qrttmbYi2Ea4qfcF5qKVbn7CaYwmvRylO85dfj
> IY6pI%3D&reserved=0
>
> JIRA: 
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissu
> es.apache.org%2Fjira%2Fbrowse%2FKAFKA-7061&data=02%7C01%7Csenthilm
> %40microsoft.com%7Ca8ca25d3f1894d0d271f08d7615966d3%7C72f988bf86f141af
> 91ab2d7cd011db47%7C1%7C0%7C637085005478393331&sdata=7c%2BzF3XRRz%2
> BijyyjBRntP6ZMWqnyzy4BEE8rqnZaF1s%3D&reserved=0
>
> Thanks,
> Senthil
>


--
-- Guozhang


[jira] [Created] (KAFKA-9145) AbstractCoordinator should respect retry backoff between rebalances

2019-11-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9145:
--

 Summary: AbstractCoordinator should respect retry backoff between 
rebalances
 Key: KAFKA-9145
 URL: https://issues.apache.org/jira/browse/KAFKA-9145
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, KafkaConnect
Reporter: Jason Gustafson
 Fix For: 2.5.0


We hit a situation where the connect worker seems to have been requesting group 
rejoins repeatedly for some reason. In theory the same thing could happen to 
the consumer if a user repeatedly changes the subscribed topics in a tight 
loop. Currently AbstractCoordinator would not backoff in these cases because 
the rebalances may all be completing successfully. It probably makes sense to 
enforce a minimum time between locally triggered rebalances in order to avoid 
overloading the broker with repeated group rebalances.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-536: Propagate broker timestamp to Admin API

2019-11-05 Thread Noa Resare
I agree with that. And looking at the MetadataResponse fields it seems there 
has been some feature creep already here. Does the client use rack information, 
for example?

I guess one could do this either by introducing a new leaner message pair, used 
specifically enable client operation, and use MetadataRequest/MetadataResponse 
for describeCluster() or one could shrink MetadataRequest/MetadataResponse and 
introduce a new more fully featured message pair for the other stuff. I would 
be happy to spend some time looking into implementing this if there is an 
interest.

/noa

> On 5 Nov 2019, at 15:43, Gwen Shapira  wrote:
> 
> It isn't just about saving space. It increases complexity to default to
> always sharing a bit of information that is really only needed in a single
> use-case.
> We avoid doing this as a matter of good protocol design.
> Arguably, this should not really piggyback on cluster metadata at all,
> since the usage is so different.
> 
> On Tue, Nov 5, 2019, 7:29 AM Noa Resare  wrote:
> 
>> It would certainly be possible to have the field be optional and only
>> include it if some flag is set in the DescribeClusterOptions instance
>> passed to Admin.describeCluster() that in turn would translate to a boolean
>> in MetadataRequest indicating that we are asking for this piece of
>> information.
>> 
>> I’m not entirely sure that this extra complexity would be worth it for the
>> modestly smaller response size, in a message that already contains things
>> like the hostname and rack identifier per broker.
>> 
>> /noa
>> 
>>> On 4 Nov 2019, at 14:45, Gwen Shapira  wrote:
>>> 
>>> Cluster metadata is sent to clients on a very regular basis. Adding
>>> start-time there seems quite repetitive. Especially considering that
>>> this information is only useful in very specific cases.
>>> 
>>> Can we add this capability to the Admin API in a way that won't impact
>>> normal client workflow?
>>> 
>>> On Mon, Nov 4, 2019 at 4:05 AM Noa Resare  wrote:
 
 Thank you for the feedback, Stanislav!
 
 I agree that it would be good to harmonise the naming, and
>> start-time-ms definitely more descriptive.
 
 I have updated the proposal to reflect this, and also added the updated
>> json RPC changes. Please have a look.
 
 /noa
 
> On 1 Nov 2019, at 09:13, Stanislav Kozlovski 
>> wrote:
> 
> Hey Noa,
> 
> KIP-436 added a JMX metric in Kafka for this exact use-case, called
> `start-time-ms`. Perhaps it would be useful to name this public
>> interface
> in the same way for consistency.
> 
> Could you update the KIP to include the specific RPC changes regarding
>> the
> metadata request/responses? Here is a recent example of how to portray
>> the
> changes -
> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response
> 
> Thanks,
> Stanislav!
> 
> On Mon, Oct 14, 2019 at 2:46 PM Noa Resare  wrote:
> 
>> We are in the process of migrating the pieces of automation that
>> currently
>> reads and modifies zookeeper state to use the Admin API.
>> 
>> One of the things that we miss doing this is access to the start time
>> of
>> brokers in a cluster which is used by our automation doing rolling
>> restarts. We currently read this from the timestamp field from the
>> epehmeral broker znodes in zookeeper. To address this limitation, I
>> have
>> authored KIP-536, that proposes adding a timestamp field to the Node
>> class
>> that the AdminClient.describeCluster() method returns.
>> 
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536%3A+Propagate+broker+timestamp+to+Admin+API
>> <
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-536:+Propagate+broker+timestamp+to+Admin+API
>>> 
>> 
>> Any and all feedback is most welcome
>> 
>> cheers
>> noa
> 
> 
> 
> --
> Best,
> Stanislav
 
>> 
>> 



Build failed in Jenkins: kafka-trunk-jdk11 #932

2019-11-05 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9110: Improve efficiency of disk reads when TLS is enabled 
(#7604)


--
[...truncated 4.56 MB...]
kafka.api.PlaintextConsumerTest > 
testPerPartitionLeadMetricsCleanUpWithSubscribe PASSED

kafka.api.PlaintextConsumerTest > testCommitMetadata STARTED

kafka.api.PlaintextConsumerTest > testCommitMetadata PASSED

kafka.api.PlaintextConsumerTest > testHeadersExtendedSerializerDeserializer 
STARTED

kafka.api.PlaintextConsumerTest > testHeadersExtendedSerializerDeserializer 
PASSED

kafka.api.PlaintextConsumerTest > testRoundRobinAssignment STARTED

kafka.api.PlaintextConsumerTest > testRoundRobinAssignment PASSED

kafka.api.PlaintextConsumerTest > testPatternSubscription STARTED

kafka.api.PlaintextConsumerTest > testPatternSubscription PASSED

kafka.api.PlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.PlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.PlaintextConsumerTest > testSimpleConsumption STARTED

kafka.api.PlaintextConsumerTest > testSimpleConsumption PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > closingChannelException STARTED

kafka.network.SocketServerTest > closingChannelException PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress PASSED

kafka.network.SocketServerTest > testIdleConnection STARTED

kafka.network.SocketServerTest > testIdleConnection PASSED

kafka.network.SocketServerTest > 
testClientDisconnectionWithStagedReceivesFullyProcessed STARTED

kafka.network.SocketServerTest > 
testClientDisconnectionWithStagedReceivesFullyProcessed PASSED

kafka.network.SocketServerTest > testZeroMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testZeroMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testMetricCollectionAfterShutdown STARTED

kafka.network.SocketServerTest > testMetricCollectionAfterShutdown PASSED

kafka.network.SocketServerTest > testSessionPrincipal STARTED

kafka.network.SocketServerTest > testSessionPrincipal PASSED

kafka.network.SocketServerTest > configureNewConnectionException STARTED

kafka.network.SocketServerTest > configureNewConnectionException PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides PASSED

kafka.network.SocketServerTest > testControlPlaneRequest STARTED

kafka.network.SocketServerTest > testControlPlaneRequest PASSED

kafka.network.SocketServerTest > processNewResponseException STARTED

kafka.network.SocketServerTest > processNewResponseException PASSED

kafka.network.SocketServerTest > testStagedListenerStartup STARTED

kafka.network.SocketServerTest > testStagedListenerStartup PASSED

kafka.network.SocketServerTest > testConnectionRateLimit STARTED

kafka.network.SocketServerTest > testConnectionRateLimit PASSED

kafka.network.SocketServerTest > processCompletedSendException STARTED

kafka.network.SocketServerTest > processCompletedSendException PASSED

kafka.network.SocketServerTest > processDisconnectedException STARTED

k

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-05 Thread Levani Kokhreidze
Hello all,

While https://github.com/apache/kafka/pull/7170 
 is under review and it’s almost 
done, I want to resurrect discussion about this KIP to address couple of 
concerns raised by Matthias and John.

As a reminder, idea of the KIP-221 was to allow DSL users control over 
repartitioning and parallelism of sub-topologies by:
1) Introducing new KStream#repartition operation which is done in 
https://github.com/apache/kafka/pull/7170 
 
2) Add new KStream#groupBy(Repartitioned) operation, which is planned to be 
separate PR.

While all agree about general implementation and idea behind 
https://github.com/apache/kafka/pull/7170 
 PR, introducing new 
KStream#groupBy(Repartitioned) method overload raised some questions during the 
review.
Matthias raised concern that there can be cases when user uses 
`KStream#groupBy(Repartitioned)` operation, but actual repartitioning may not 
required, thus configuration passed via `Repartitioned` would never be applied 
(Matthias, please correct me if I misinterpreted your comment). 
So instead, if user wants to control parallelism of sub-topologies, he or she 
should always use `KStream#repartition` operation before groupBy. Full comment 
can be seen here: 
https://github.com/apache/kafka/pull/7170#issuecomment-519303125 
 

On the same topic, John pointed out that, from API design perspective, we 
shouldn’t intertwine configuration classes of different operators between one 
another. So instead of introducing new `KStream#groupBy(Repartitioned)` for 
specifying number of partitions for internal topic, we should update existing 
`Grouped` class with `numberOfPartitions` field.

Personally, I think Matthias’s concern is valid, but on the other hand Kafka 
Streams has already optimizer in place which alters topology independently from 
user. So maybe it makes sense if Kafka Streams, internally would optimize 
topology in the best way possible, even if in some cases this means ignoring 
some operator configurations passed by the user. Also, I agree with John about 
API design semantics. If we go through with the changes for `KStream#groupBy` 
operation, it makes more sense to add `numberOfPartitions` field to `Grouped` 
class instead of introducing new `KStream#groupBy(Repartitioned)` method 
overload. 

I would really appreciate communities feedback on this.

Kind regards,
Levani



> On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman  wrote:
> 
> Hey Levani,
> 
> I think people are busy with the upcoming 2.4 release, and don't have much
> spare time at the
> moment. It's kind of a difficult time to get attention on things, but feel
> free to pick up something else
> to work on in the meantime until things have calmed down a bit!
> 
> Cheers,
> Sophie
> 
> 
> On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze  >
> wrote:
> 
>> Hello all,
>> 
>> Sorry for bringing this thread again, but I would like to get some
>> attention on this PR: https://github.com/apache/kafka/pull/7170 
>>  <
>> https://github.com/apache/kafka/pull/7170 
>> >
>> It's been a while now and I would love to move on to other KIPs as well.
>> Please let me know if you have any concerns.
>> 
>> Regards,
>> Levani
>> 
>> 
>>> On Jul 26, 2019, at 11:25 AM, Levani Kokhreidze >> >
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> Here’s voting thread for this KIP:
>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html 
>>  <
>> https://www.mail-archive.com/dev@kafka.apache.org/msg99680.html 
>> >
>>> 
>>> Regards,
>>> Levani
>>> 
 On Jul 24, 2019, at 11:15 PM, Levani Kokhreidze >>> 
>> >> wrote:
 
 Hi Matthias,
 
 Thanks for the suggestion. I Don’t have strong opinion on that one.
 Agree that avoiding unnecessary method overloads is a good idea.
 
 Updated KIP
 
 Regards,
 Levani
 
 
> On Jul 24, 2019, at 8:50 PM, Matthias J. Sax  
>> >> wrote:
> 
> One question:
> 
> Why do we add
> 
>> Repartitioned#with(final String name, final int numberOfPartitions)
> 
> It seems that `#with(String name)`, `#numberOfPartitions(int)` in
> combination with `withName()` and `withNumberOfPartitions()` should be
> sufficient. Users can chain the method calls.
> 
> (I think it's valuable to keep the number of overload small if
>> possible.)
> 
> Otherwise LGTM.
> 

Build failed in Jenkins: kafka-2.4-jdk8 #52

2019-11-05 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] MINOR: Rework NewPartitionReassignment public API (#7638)


--
[...truncated 2.69 MB...]
org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
STARTED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
PASSED

org.apache.kafka.streams.TestTopicsTest > testDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testValue STARTED

org.apache.kafka.streams.TestTopicsTest > testValue PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName PASSED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMapWithNull PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics STARTED

org.apache.kafka.streams.TestTopicsTest > testMultipleTopics PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueList PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputWithNullDriver PASSED

org.apache.kafka.streams.TestTopicsTest > testValueList STARTED

org.apache.kafka.streams.TestTopicsTest > testValueList PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordList PASSED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonExistingInputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValuesToMap PASSED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList STARTED

org.apache.kafka.streams.TestTopicsTest > testRecordsToList PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValueListDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testInputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testInputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders STARTED

org.apache.kafka.streams.TestTopicsTest > testWithHeaders PASSED

org.apache.kafka.streams.TestTopicsTest > testKeyValue STARTED

org.apache.kafka.streams.TestTopicsTest > testKeyValue PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateTopicWithNullTopicName PASSED

> Task :streams:upgrade-system-tests-0101:compileJava NO-SOURCE
> Task :streams:upgrade-system-tests-0101:processResources NO-SOURCE
> Task :streams:upgrade-system-tests-0101:classes UP-TO-DATE
> Task :streams:upgrade-system-tests-0101:checkstyl

[jira] [Created] (KAFKA-9146) Add option to force delete members in stream reset tool

2019-11-05 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9146:
--

 Summary: Add option to force delete members in stream reset tool
 Key: KAFKA-9146
 URL: https://issues.apache.org/jira/browse/KAFKA-9146
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Reporter: Boyang Chen


Sometimes people want to reset the stream application sooner, but blocked by 
the left-over members inside group coordinator, which only expire after session 
timeout. When user configures a really long session timeout, it could prevent 
the group from clearing. We should consider adding the support to cleanup 
members by forcing them to leave the group. To do that, 
 # If the stream application is already on static membership, we could call 
directly from adminClient.removeMembersFromGroup
 # If the application is on dynamic membership, we should modify 
adminClient.removeMembersFromGroup interface to allow deletion based on 
member.id.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk8 #4018

2019-11-05 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9110: Improve efficiency of disk reads when TLS is enabled 
(#7604)


--
[...truncated 8.30 MB...]

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shou

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2019-11-05 Thread Sophie Blee-Goldman
> Personally, I think Matthias’s concern is valid, but on the other hand
Kafka Streams has already
> optimizer in place which alters topology independently from user

I agree (with you) and think this is a good way to put it -- we currently
auto-repartition for the user so
that they don't have to walk through their entire topology and reason about
when and where to place a
`.through` (or the new `.repartition`), so why suddenly force this onto the
user? How certain are we that
users will always get this right? It's easy to imagine that during
development, you write your new app with
correctly placed repartitions in order to use this new feature. During the
course of development you end up
tweaking the topology, but don't remember to review or move the
repartitioning since you're used to Streams
doing this for you. If you use only single-partition topics for testing,
you might not even notice your app is
spitting out incorrect results!

Anyways, I feel pretty strongly that it would be weird to introduce a new
feature and say that to use it, you can't take
advantage of this other feature anymore. Also, is it possible our
optimization framework could ever include an
optimized repartitioning strategy that is better than what a user could
achieve by manually inserting repartitions?
Do we expect users to have a deep understanding of the best way to
repartition their particular topology, or is it
likely they will end up over-repartitioning either due to missed
optimizations or unnecessary extra repartitions?
I think many users would prefer to just say "if there *is* a repartition
required at this point in the topology, it should
have N partitions"

As to the idea of adding `numberOfPartitions` to Grouped rather than
adding a new parameter to groupBy, that does seem more in line with the
current syntax so +1 from me

On Tue, Nov 5, 2019 at 2:07 PM Levani Kokhreidze 
wrote:

> Hello all,
>
> While https://github.com/apache/kafka/pull/7170 <
> https://github.com/apache/kafka/pull/7170> is under review and it’s
> almost done, I want to resurrect discussion about this KIP to address
> couple of concerns raised by Matthias and John.
>
> As a reminder, idea of the KIP-221 was to allow DSL users control over
> repartitioning and parallelism of sub-topologies by:
> 1) Introducing new KStream#repartition operation which is done in
> https://github.com/apache/kafka/pull/7170 <
> https://github.com/apache/kafka/pull/7170>
> 2) Add new KStream#groupBy(Repartitioned) operation, which is planned to
> be separate PR.
>
> While all agree about general implementation and idea behind
> https://github.com/apache/kafka/pull/7170 <
> https://github.com/apache/kafka/pull/7170> PR, introducing new
> KStream#groupBy(Repartitioned) method overload raised some questions during
> the review.
> Matthias raised concern that there can be cases when user uses
> `KStream#groupBy(Repartitioned)` operation, but actual repartitioning may
> not required, thus configuration passed via `Repartitioned` would never be
> applied (Matthias, please correct me if I misinterpreted your comment).
> So instead, if user wants to control parallelism of sub-topologies, he or
> she should always use `KStream#repartition` operation before groupBy. Full
> comment can be seen here:
> https://github.com/apache/kafka/pull/7170#issuecomment-519303125 <
> https://github.com/apache/kafka/pull/7170#issuecomment-519303125>
>
> On the same topic, John pointed out that, from API design perspective, we
> shouldn’t intertwine configuration classes of different operators between
> one another. So instead of introducing new `KStream#groupBy(Repartitioned)`
> for specifying number of partitions for internal topic, we should update
> existing `Grouped` class with `numberOfPartitions` field.
>
> Personally, I think Matthias’s concern is valid, but on the other hand
> Kafka Streams has already optimizer in place which alters topology
> independently from user. So maybe it makes sense if Kafka Streams,
> internally would optimize topology in the best way possible, even if in
> some cases this means ignoring some operator configurations passed by the
> user. Also, I agree with John about API design semantics. If we go through
> with the changes for `KStream#groupBy` operation, it makes more sense to
> add `numberOfPartitions` field to `Grouped` class instead of introducing
> new `KStream#groupBy(Repartitioned)` method overload.
>
> I would really appreciate communities feedback on this.
>
> Kind regards,
> Levani
>
>
>
> > On Oct 17, 2019, at 12:57 AM, Sophie Blee-Goldman 
> wrote:
> >
> > Hey Levani,
> >
> > I think people are busy with the upcoming 2.4 release, and don't have
> much
> > spare time at the
> > moment. It's kind of a difficult time to get attention on things, but
> feel
> > free to pick up something else
> > to work on in the meantime until things have calmed down a bit!
> >
> > Cheers,
> > Sophie
> >
> >
> > On Wed, Oct 16, 2019 at 11:26 AM Levani Kokhreidze <
> levani

[jira] [Resolved] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-11-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8367.

Resolution: Fixed

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9147) zookeeper service not running

2019-11-05 Thread parimal (Jira)
parimal created KAFKA-9147:
--

 Summary: zookeeper service not running 
 Key: KAFKA-9147
 URL: https://issues.apache.org/jira/browse/KAFKA-9147
 Project: Kafka
  Issue Type: Test
Affects Versions: 2.3.0
 Environment: Ubuntu
Reporter: parimal


i was able to start zookeeper service on stand alone Ubuntu using the command

 

root@N-5CG73531RZ:/# /usr/local/zookeeper/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

 

However when i do ps -ef I dont see any zookeeper service running 

 

root@N-5CG73531RZ:/# ps -ef
UID PID PPID C STIME TTY TIME CMD
root 1 0 0 Nov04 ? 00:00:00 /init
root 5 1 0 Nov04 tty1 00:00:00 /init
pgarg00 6 5 0 Nov04 tty1 00:00:00 -bash
root 2861 6 0 Nov04 tty1 00:00:00 sudo -i
root 2862 2861 0 Nov04 tty1 00:00:03 -bash
root 5347 1 0 18:24 ? 00:00:00 /usr/sbin/sshd
root 5367 1 0 18:25 ? 00:00:00 /usr/sbin/inetd
root 8950 2862 0 19:15 tty1 00:00:00 ps -ef

 

Also when I do telnet , connection is refused 

root@N-5CG73531RZ:/# telnet localhost 2181
Trying 127.0.0.1...
telnet: Unable to connect to remote host: Connection refused

 

can you plz help me ?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9148) Consider forking RocksDB for Streams

2019-11-05 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9148:
--

 Summary: Consider forking RocksDB for Streams 
 Key: KAFKA-9148
 URL: https://issues.apache.org/jira/browse/KAFKA-9148
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (WriteBufferManager -- KAFKA-8215). Unfortunately, someone recently 
discovered a ~8% performance regression that exists in all versions 5.18+ 
(latest being 6.2.2 as of now). Flink was able to react to this by downgrading 
to 5.17 and picking the WriteBufferManager to their fork, FRocksDB.

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

 

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and configuration options 
that have been in the c++ API for a very long time
 ** Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
 ** Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful features prohibitively expensive.
 *** [Custom 
comparator|[https://github.com/facebook/rocksdb/issues/538#issuecomment-83145980]]:
 a custom comparator could significantly improve the performance of session 
windows
 *** [Prefix Seek|[https://github.com/facebook/rocksdb/issues/6004]]: not 
currently used by Streams but a commonly requested feature, and may also allow 
improved range queries
 ** Even when an external contributor develops a solution for poorly performing 
Java functionality and helpfully tries to contribute their patch back to 
rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
#2283|https://github.com/facebook/rocksdb/pull/2283])


Cons:
 * more work

 

Given that we rarely upgrade the Rocks dependency, use only some fraction of 
its features, and would need or want to make only minimal changes ourselves, it 
seems like we could actually get away with very little extra work by forking 
rocksdb. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Vinoth Chandar
Ping :) Any thoughts?

On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:

> >>  I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both Without the topic partitions that the tasks can map back to, we
> have to rely on topology/cluster metadata in each Streams instance to map
> the task back. If the source topics are wild carded for e,g then each
> instance could have different source topics in topology, until the next
> rebalance happens. You can also read my comments from here
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
>
>
> >> seems hard to imagine how encoding arbitrarily long topic names plus an
> integer for the partition number could be as efficient as task ids, which
> are just two integers.
> 3. if you still have concerns about the efficacy of dictionary encoding,
> happy to engage. The link above also has some benchmark code I used.
> Theoretically, we would send each topic name atleast once, so yes if you
> compare a 10-20 character topic name + an integer to two integers, it will
> be more bytes. But its constant overhead proportional to size of topic name
> and with 4,8,12, partitions the size difference between baseline (version 4
> where we just repeated topic names for each topic partition) and the two
> approaches becomes narrow.
>
> >>Plus, Navinder is going to implement a bunch of protocol code that we
> might just want to change when the discussion actually does take place, if
> ever.
> >>it'll just be a mental burden for everyone to remember that we want to
> have this follow-up discussion.
> 3. Is n't people changing same parts of code and tracking follow ups a
> common thing, we need to deal with anyway?  For this KIP, is n't it enough
> to reason about whether the additional map on top of the topic dictionary
> would incur more overhead than the sending task_ids? I don't think it's
> case, both of them send two integers. As I see it, we can do a separate
> follow up to (re)pursue the task_id conversion and get it working for both
> maps within the next release?
>
> >>Can you elaborate on "breaking up the API"? It looks like there are
> already separate API calls in the proposal, one for time-lag, and another
> for offset-lag, so are they not already broken up?
> The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
> objects which has both time and offset lags. If we had separate APIs, say
> (e.g offsetLagForStore(), timeLagForStore()), we can implement offset
> version using the offset lag that the streams instance already tracks i.e
> no need for external calls. The time based lag API would incur the kafka
> read for the timestamp. makes sense?
>
> Based on the discussions so far, I only see these two pending issues to be
> aligned on. Is there any other open item people want to bring up?
>
> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman 
> wrote:
>
>> Regarding 3) I'm wondering, does your concern still apply even now
>> that the pluggable PartitionGrouper interface has been deprecated?
>> Now that we can be sure that the DefaultPartitionGrouper is used to
>> generate
>> the taskId -> partitions mapping, we should be able to convert any taskId
>> to any
>> partitions.
>>
>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
>>
>> > Hey Vinoth, thanks for the reply!
>> >
>> > 3.
>> > I get that it's not the main focus of this KIP, but if it's ok, it
>> > would be nice to hash out this point right now. It only came up
>> > because this KIP-535 is substantially extending the pattern in
>> > question. If we push it off until later, then the reviewers are going
>> > to have to suspend their concerns not just while voting for the KIP,
>> > but also while reviewing the code. Plus, Navinder is going to
>> > implement a bunch of protocol code that we might just want to change
>> > when the discussion actually does take place, if ever. Finally, it'll
>> > just be a mental burden for everyone to remember that we want to have
>> > this follow-up discussion.
>> >
>> > It makes sense what you say... the specific assignment is already
>> > encoded in the "main" portion of the assignment, not in the "userdata"
>> > part. It also makes sense that it's simpler to reason about races if
>> > you simply get all the information about the topics and partitions
>> > directly from the assignor, rather than get the partition number from
>> > the assignor and the topic name from your own a priori knowledge of
>> > the topology. On the other hand, I'm having some trouble wrapping my
>> > head around what race conditions might occur, other than the
>> > fundamentally broken state in which different instances are running
>> > totally different topologies. Sorry, but can you remind us of the
>> > specific condition

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread John Roesler
Hey Vinoth,

Really sorry, I just remembered that I started a reply earlier today,
but got side-tracked.

Regarding the AssignmentInfo extension:

Your explanation for this point makes sense. I was incorrectly
thinking that the cluster metadata was shared with all members, but
now I see it's only given to the assignor. I agree now that the
assignor basically has to encode this information in the userdata
field if it wants the members to have it. Thanks for your patience in
explaining and linking the relevant history.

Given this constraint, the encoding part of the discussion is moot.
Regardless, the detail you provided does make sense to me.

I'm now in favor of the proposal for extending AssignmentInfo.


Regarding "breaking up the API":

Ah, my mistake. Yes, it sounds like this would be a good idea. I just
took another look at KafkaStreams. Since there's no method for getting
all the local stores, perhaps we can skip the "get the lags for all
stores" method, and just add two new methods to the KafkaStreams
interface like this:

KafkaStreams {
// existing
 T store(String storeName, QueryableStoreType queryableStoreType)

// new
/* Report the current amount by which the local store lags behind the
changelog tail. This is an indicator of how fresh the local copy of a
store is with respect to the active copy. Always 0 for stores in
active tasks. */
long storeChangelogOffsetLag(String storeName)

/* Report the time difference between the last consumed changelog
record's timestamp and the changelog tail record's timestamp. This is
an indicator of how fresh the local copy of a store is with respect to
the active copy. Always Duration.ZERO for stores in active tasks. */
Duration storeChangelogTimeLag(String storeName)
}

Note, I'm not insisting on this interface, just proposing it to
potentially minimize back-and-forth. Here's the reasoning:
* Since this API is no longer reporting lags for all stores, just
local ones, it makes sense to try and stick close to the `store(name,
type)` method. This also brings the new methods down to two. If others
think there's a use case for getting all the stores' lags, then we can
also propose to add corresponding `all*Lags` methods that return
`Map`.
* I also just realized that we were proposing to add
`timeLagEstimateMs()` as a `long`, but as a project, we have a larger
evolution to migrate to `Duration` and `Instant` where applicable. I
think it makes sense to do that in this case.

How does this sound?
Thanks,
-John



On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar  wrote:
>
> Ping :) Any thoughts?
>
> On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:
>
> > >>  I'm having some trouble wrapping my head around what race conditions
> > might occur, other than the fundamentally broken state in which different
> > instances are running totally different topologies.
> > 3. @both Without the topic partitions that the tasks can map back to, we
> > have to rely on topology/cluster metadata in each Streams instance to map
> > the task back. If the source topics are wild carded for e,g then each
> > instance could have different source topics in topology, until the next
> > rebalance happens. You can also read my comments from here
> > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> >
> >
> > >> seems hard to imagine how encoding arbitrarily long topic names plus an
> > integer for the partition number could be as efficient as task ids, which
> > are just two integers.
> > 3. if you still have concerns about the efficacy of dictionary encoding,
> > happy to engage. The link above also has some benchmark code I used.
> > Theoretically, we would send each topic name atleast once, so yes if you
> > compare a 10-20 character topic name + an integer to two integers, it will
> > be more bytes. But its constant overhead proportional to size of topic name
> > and with 4,8,12, partitions the size difference between baseline (version 4
> > where we just repeated topic names for each topic partition) and the two
> > approaches becomes narrow.
> >
> > >>Plus, Navinder is going to implement a bunch of protocol code that we
> > might just want to change when the discussion actually does take place, if
> > ever.
> > >>it'll just be a mental burden for everyone to remember that we want to
> > have this follow-up discussion.
> > 3. Is n't people changing same parts of code and tracking follow ups a
> > common thing, we need to deal with anyway?  For this KIP, is n't it enough
> > to reason about whether the additional map on top of the topic dictionary
> > would incur more overhead than the sending task_ids? I don't think it's
> > case, both of them send two integers. As I see it, we can do a separate
> > follow up to (re)pursue the task_id conversion and get it working for both
> > maps within the next release?
> >
> > >>Can you elaborate on "breaking up the API"? It looks like the

Jenkins build is back to normal : kafka-trunk-jdk11 #933

2019-11-05 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Navinder Brar
Thanks John and Vinoth for converging thoughts on AssignmentInfo.
   
   - Report the time difference between the last consumed changelog record's 
timestamp and the changelog tail record's timestamp. This is an indicator of 
how fresh the local copy of a store is with respect to the active copy. Always 
Duration.ZERO for stores in active tasks      >>    I think for restoring 
active tasks this could still be non-zero.
   - I agree if there is no use case for  allLagInfo() maybe it's not needed at 
all.

Regards,Navinder 

On Wednesday, 6 November, 2019, 09:00:39 am IST, John Roesler 
 wrote:  
 
 Hey Vinoth,

Really sorry, I just remembered that I started a reply earlier today,
but got side-tracked.

Regarding the AssignmentInfo extension:

Your explanation for this point makes sense. I was incorrectly
thinking that the cluster metadata was shared with all members, but
now I see it's only given to the assignor. I agree now that the
assignor basically has to encode this information in the userdata
field if it wants the members to have it. Thanks for your patience in
explaining and linking the relevant history.

Given this constraint, the encoding part of the discussion is moot.
Regardless, the detail you provided does make sense to me.

I'm now in favor of the proposal for extending AssignmentInfo.


Regarding "breaking up the API":

Ah, my mistake. Yes, it sounds like this would be a good idea. I just
took another look at KafkaStreams. Since there's no method for getting
all the local stores, perhaps we can skip the "get the lags for all
stores" method, and just add two new methods to the KafkaStreams
interface like this:

KafkaStreams {
// existing
 T store(String storeName, QueryableStoreType queryableStoreType)

// new
/* Report the current amount by which the local store lags behind the
changelog tail. This is an indicator of how fresh the local copy of a
store is with respect to the active copy. Always 0 for stores in
active tasks. */
long storeChangelogOffsetLag(String storeName)

/* Report the time difference between the last consumed changelog
record's timestamp and the changelog tail record's timestamp. This is
an indicator of how fresh the local copy of a store is with respect to
the active copy. Always Duration.ZERO for stores in active tasks. */
Duration storeChangelogTimeLag(String storeName)
}

Note, I'm not insisting on this interface, just proposing it to
potentially minimize back-and-forth. Here's the reasoning:
* Since this API is no longer reporting lags for all stores, just
local ones, it makes sense to try and stick close to the `store(name,
type)` method. This also brings the new methods down to two. If others
think there's a use case for getting all the stores' lags, then we can
also propose to add corresponding `all*Lags` methods that return
`Map`.
* I also just realized that we were proposing to add
`timeLagEstimateMs()` as a `long`, but as a project, we have a larger
evolution to migrate to `Duration` and `Instant` where applicable. I
think it makes sense to do that in this case.

How does this sound?
Thanks,
-John



On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar  wrote:
>
> Ping :) Any thoughts?
>
> On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:
>
> > >>  I'm having some trouble wrapping my head around what race conditions
> > might occur, other than the fundamentally broken state in which different
> > instances are running totally different topologies.
> > 3. @both Without the topic partitions that the tasks can map back to, we
> > have to rely on topology/cluster metadata in each Streams instance to map
> > the task back. If the source topics are wild carded for e,g then each
> > instance could have different source topics in topology, until the next
> > rebalance happens. You can also read my comments from here
> > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> >
> >
> > >> seems hard to imagine how encoding arbitrarily long topic names plus an
> > integer for the partition number could be as efficient as task ids, which
> > are just two integers.
> > 3. if you still have concerns about the efficacy of dictionary encoding,
> > happy to engage. The link above also has some benchmark code I used.
> > Theoretically, we would send each topic name atleast once, so yes if you
> > compare a 10-20 character topic name + an integer to two integers, it will
> > be more bytes. But its constant overhead proportional to size of topic name
> > and with 4,8,12, partitions the size difference between baseline (version 4
> > where we just repeated topic names for each topic partition) and the two
> > approaches becomes narrow.
> >
> > >>Plus, Navinder is going to implement a bunch of protocol code that we
> > might just want to change when the discussion actually does take place, if
> > ever.
> > >>it'll just be a mental burden for everyone to remember that we wa

Re: [VOTE] KIP-150 - Kafka-Streams Cogroup

2019-11-05 Thread Matthias J. Sax
+1 (binding)


On 10/31/19 10:52 AM, Walker Carlson wrote:
> Hello all,
> 
> I'd like to call a vote on the updated KIP-150: Kafka-Streams Cogroup
> found here
> 
> 
> Thanks,
> Walker
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (KAFKA-9149) Avoid temp byte array creation when use ByteBufferSerializer

2019-11-05 Thread chenxu (Jira)
chenxu created KAFKA-9149:
-

 Summary: Avoid temp byte array creation when use 
ByteBufferSerializer
 Key: KAFKA-9149
 URL: https://issues.apache.org/jira/browse/KAFKA-9149
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: chenxu


Code in ByteBufferSerializer#serialize like this
{code:java}
public byte[] serialize(String topic, ByteBuffer data) {
  if (data == null)
return null;
  data.rewind();
  if (data.hasArray()) {
byte[] arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
  return arr;
}
  }
  byte[] ret = new byte[data.remaining()];
  data.get(ret, 0, ret.length);
  data.rewind();
  return ret;
}
{code}
Temp byte array will be created when use with DirectByteBuffer, how about 
declare a method such as serialize2Buffer and return the ByteBuffer directly ? 
This can improve GC a lot in KafkaProducer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Matthias J. Sax
Navinder,

thanks for updating the KIP. Couple of follow up questions:


(10) Why do we need to introduce the class `KeyQueryMetadata`?

(20) Why do we introduce the two methods `allMetadataForKey()`? Would it
not be simpler to add `Collection
standbyMetadataForKey(...)`. This would align with new methods
`#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?

(30) Why do we need the class `StoreLagInfo` -- it seems simpler to just
extend `StreamMetadata` with the corresponding attributes and methods
(of active task, the lag would always be reported as zero)

(32) Via (30) we can avoid the two new methods `#allLagInfo()` and
`#lagInfoForStore()`, too, reducing public API and making it simpler to
use the feature.

Btw: If we make `StreamMetadata` thread safe, the lag information can be
updated in the background without the need that the application
refreshes its metadata. Hence, the user can get active and/or standby
metadata once, and only needs to refresh it, if a rebalance happened.


About point (4) of the previous thread: I was also thinking about
when/how to update the time-lag information, and I agree that we should
not update it for each query.

"How": That we need to fetch the last record is a little bit
unfortunate, but I don't see any other way without a broker change. One
issue I still see is with "exactly-once" -- if transaction markers are
in the topic, the last message is not at offset "endOffset - 1" and as
multiple transaction markers might be after each other, it's unclear how
to identify the offset of the last record... Thoughts?

Hence, it might be worth to look into a broker change as a potential
future improvement. It might be possible that the broker caches the
latest timestamp per partition to serve this data efficiently, similar
to `#endOffset()`.

"When": We refresh the end-offset information based on the
`commit.interval.ms` -- doing it more often is not really useful, as
state store caches will most likely buffer up all writes to changelogs
anyway and are only flushed on commit (including a flush of the
producer). Hence, I would suggest to update the time-lag information
based on the same strategy in the background. This way there is no
additional config or methods and the user does not need to worry about
it at all.

To avoid refresh overhead if we don't need it (a user might not use IQ
to begin with), it might be worth to maintain an internal flag
`updateTimeLagEnabled` that is set to `false` initially and only set to
`true` on the first call of a user to get standby-metadata.


-Matthias



On 11/4/19 5:13 PM, Vinoth Chandar wrote:
>>>  I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both Without the topic partitions that the tasks can map back to, we
> have to rely on topology/cluster metadata in each Streams instance to map
> the task back. If the source topics are wild carded for e,g then each
> instance could have different source topics in topology, until the next
> rebalance happens. You can also read my comments from here
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> 
> 
>>> seems hard to imagine how encoding arbitrarily long topic names plus an
> integer for the partition number could be as efficient as task ids, which
> are just two integers.
> 3. if you still have concerns about the efficacy of dictionary encoding,
> happy to engage. The link above also has some benchmark code I used.
> Theoretically, we would send each topic name atleast once, so yes if you
> compare a 10-20 character topic name + an integer to two integers, it will
> be more bytes. But its constant overhead proportional to size of topic name
> and with 4,8,12, partitions the size difference between baseline (version 4
> where we just repeated topic names for each topic partition) and the two
> approaches becomes narrow.
> 
>>> Plus, Navinder is going to implement a bunch of protocol code that we
> might just want to change when the discussion actually does take place, if
> ever.
>>> it'll just be a mental burden for everyone to remember that we want to
> have this follow-up discussion.
> 3. Is n't people changing same parts of code and tracking follow ups a
> common thing, we need to deal with anyway?  For this KIP, is n't it enough
> to reason about whether the additional map on top of the topic dictionary
> would incur more overhead than the sending task_ids? I don't think it's
> case, both of them send two integers. As I see it, we can do a separate
> follow up to (re)pursue the task_id conversion and get it working for both
> maps within the next release?
> 
>>> Can you elaborate on "breaking up the API"? It looks like there are
> already separate API calls in the proposal, one for time-lag, and another
> for offse

Build failed in Jenkins: kafka-trunk-jdk11 #934

2019-11-05 Thread Apache Jenkins Server
See 


Changes:

[cmccabe] KAFKA-9137: Fix incorrect FetchSessionCache eviction logic (#7640)

[wangguoz] HOTFIX: remove reference to unused Assignment error code (#7645)


--
[...truncated 2.74 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldP

Re: [DISCUSS] KIP-280: Enhanced log compaction

2019-11-05 Thread Matthias J. Sax
Thanks for updating the KIP, Senthil.

@Eric: good point about using the last found header for the key instead
of the first!

I don't have any further comments at this point.


-Matthias

On 11/5/19 11:37 AM, Senthilnathan Muthusamy wrote:
> Hi Guozhang,
> 
> Sure and I have made a note in the JIRA item to make sure the wiki is updated.
> 
> Thanks,
> Senthil
> 
> -Original Message-
> From: Guozhang Wang  
> Sent: Monday, November 4, 2019 11:00 AM
> To: dev 
> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
> 
> Hello Senthilnathan,
> 
> Thanks for revamping on the KIP. I have only one comment about the wiki 
> otherwise LGTM.
> 
> 1. We should emphasize that the newly introduced config yields to the 
> existing "log.cleanup.policy", i.e. if the latter's value is `delete` not 
> `compact`, then the previous config would be ignored.
> 
> 
> Guozhang
> 
> On Mon, Nov 4, 2019 at 9:52 AM Senthilnathan Muthusamy 
>  wrote:
> 
>> Hi all,
>>
>> I will start the vote thread shortly for this updated KIP. If there 
>> are any more thoughts I would love to hear them.
>>
>> Thanks,
>> Senthil
>>
>> -Original Message-
>> From: Senthilnathan Muthusamy 
>> Sent: Thursday, October 31, 2019 3:51 AM
>> To: dev@kafka.apache.org
>> Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
>>
>> Hi Matthias
>>
>> Thanks for the response.
>>
>> (1) Yes
>>
>> (2) Yes, and the config name will be the same (i.e.
>> `log.cleaner.compaction.strategy` &
>> `log.cleaner.compaction.strategy.header`) at broker level and topic 
>> level (to override broker level default compact strategy). Please let 
>> me know if we need to keep it in different naming convention. Note: 
>> Broker level (which will be in the server.properties) configuration is 
>> optional and default it to offset. Topic level configuration will be 
>> default to broker level config...
>>
>> (3) By this new way, it avoids another config parameter and also in 
>> feature if any new strategy like header need addition info, no 
>> additional config required. As this got discussed already and agreed 
>> to have separate config, I will revert it. KIP updated...
>>
>> (4) Done
>>
>> (5) Updated
>>
>> (6) Updated to pick the first header in the list
>>
>> Please let me know if you have any other questions.
>>
>> Thanks,
>> Senthil
>>
>> -Original Message-
>> From: Matthias J. Sax 
>> Sent: Thursday, October 31, 2019 12:13 AM
>> To: dev@kafka.apache.org
>> Subject: Re: [DISCUSS] KIP-280: Enhanced log compaction
>>
>> Thanks for picking up this KIP, Senthil.
>>
>> (1) As far as I remember, the main issue of the original proposal was 
>> a missing topic level configuration for the compaction strategy. With 
>> this being addressed, I am in favor of this KIP.
>>
>> (2) With regard to (1), it seems we would need a new topic level 
>> config `compaction.strategy`, and `log.cleaner.compaction.strategy` 
>> would be the default strategy (ie, broker level config) if a topic does not 
>> overwrite it?
>>
>> (3) Why did you remove `log.cleaner.compaction.strategy.header`
>> parameter and change the accepted values of 
>> `log.cleaner.compaction.strategy` to "header." instead of keeping 
>> "header"? The original approach seems to be cleaner, and I think this 
>> was discussed on the original discuss thread already.
>>
>> (4) Nit: For the "timestamp" compaction strategy you changed the KIP 
>> to
>>
>> -> `The record [create] timestamp`
>>
>> This is miss leading IMHO, because it depends on the broker/log 
>> configuration `(log.)message.timestamp.type` that can either be 
>> `CreateTime` or `LogAppendTime` what the actual record timestamp is. I 
>> would just remove "create" to keep it unspecified.
>>
>> (5) Nit: the section "Public Interfaces" should list the newly 
>> introduced configs -- configuration parameters are a public interface.
>>
>> (6) What do you mean by "first level header lookup"? The term "first 
>> level" indicates some hierarchy, but headers don't have any hierarchy 
>> -- it's just a list of key-value pairs? If you mean the _order_ of the 
>> headers, ie, pick the first header in the list that matches the key, 
>> please rephrase it to make it clearer.
>>
>>
>>
>> @Tom: I agree with all you are saying, however, I still think that 
>> this KIP will improve the overall situation, because everything you 
>> pointed out is actually true with offset based compaction, too.
>>
>> The KIP is not a silver bullet that solves all issue for interleaved 
>> writes, but I personally believe, it's a good improvement.
>>
>>
>>
>> -Matthias
>>
>>
>> On 10/30/19 9:45 AM, Senthilnathan Muthusamy wrote:
>>> Hi,
>>>
>>> Please let me know if anyone has any questions on this updated KIP-280...
>>>
>>> Thanks,
>>>
>>> Senthil
>>>
>>> -Original Message-
>>> From: Senthilnathan Muthusamy 
>>> Sent: Monday, October 28, 2019 11:36 PM
>>> To: dev@kafka.apache.org
>>> Subject: RE: [DISCUSS] KIP-280: Enhanced log compaction
>>>
>>> Hi Tom,
>>>
>>> Sorry for th