[jira] [Created] (KAFKA-8226) New MirrorMaker option partition.to.partition

2019-04-12 Thread JIRA
Ernestas Vaiciukevičius created KAFKA-8226:
--

 Summary: New MirrorMaker option partition.to.partition
 Key: KAFKA-8226
 URL: https://issues.apache.org/jira/browse/KAFKA-8226
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Ernestas Vaiciukevičius


Currently when MirrorMaker moves data between topics with records with null 
keys - it shuffles records between destination topic's partitions. Sometimes 
it's desirable trying to preserve the original partition.

Related PR adds new command line option to do that:

When partition.to.partition=true MirrorMaker retains the partition number when 
mirroring records even without the keys. 
When using this option - source and destination topics are assumed to have the 
same number of partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Stream caching

2019-04-12 Thread Ярослав Когут
Hello, yes I know it, I read about rocks db.

But do we have ability to make some size limitation for this db? Because we 
have alerts in grafana, that application is using more then 80% of full app 
memory.

I’ll be very grateful for the help.

Thanks.

Kohut Yaroslav

> On Apr 12, 2019, at 4:02 AM, Guozhang Wang  wrote:
> 
> Hello Kohut,
> 
> Streams's default state.dir location is /tmp/kafka-streams, that may
> explained it.
> 
> For more info about state management of Kafka Streams, you can read
> https://kafka.apache.org/21/documentation/streams/architecture#streams_architecture_state
> 
> 
> Guozhang
> 
> On Thu, Apr 11, 2019 at 1:09 AM Ярослав Когут 
> wrote:
> 
>> Hi guys.
>> 
>> I’ve deployed spring cloud application using docker, service is using
>> KStream functionality, and I have problems with /tmp folder.
>> 
>> On some instances of application /tmp folder is from 3 to 6 GB. Maybe I
>> missed some property?
>> 
>> I will be very grateful for the help.
>> 
>> Thanks, best regards.
>> 
>> Kohut Yaroslav
> 
> 
> 
> -- 
> -- Guozhang



Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2019-04-12 Thread Tom Bentley
Hi Rajini,

I've made a number of changes to the KIP.

1. I've added RequestedTopicState.requestedConfigs(). This is obviously
unrelated to supporting alter broker, but I think it goes some way to
addressing one of the points Anna made last year.
Anna, wdyt?

2. I've added BrokerState, RequestedBrokerState and BrokerManagementPolicy.
These are largely similar to the interfaces for topic management, but the
lifecycle of a BrokerManagementPolicy needs to be different.

That's because a BrokerManagementPolicy ought to be Configurable with the
broker config, but obviously the broker config can change. Because a
cluster-scoped config might be changed via a different broker we need to
hook into the Zookeeper change notification on the broker configs to
instantiate a new BrokerManagementPolicy when broker policy changes. We'd
need to cope with policy implementation change happening concurrently with
policy enforcement.
And technically there's a race here: Sending changes to cluster-scoped
configs to multiple brokers could result in non-deterministic policy
enforcement.

One way to avoid that would be to require changes to cluster-scoped configs
to be sent to the controller.
This complexity is annoying because it seems likely that many policy
implementations won't _actually_ depend on the broker config.

Thoughts?

Kind regards,

Tom

On Wed, Apr 10, 2019 at 9:48 AM Rajini Sivaram 
wrote:

> Thanks Tom.
>
> Once you have updated the KIP to support broker config updates, it may be
> good to start a new vote thread since the other one is quite old and
> perhaps the KIP has changed since then.
>
>
> On Wed, Apr 10, 2019 at 3:58 AM Tom Bentley  wrote:
>
> > Hi Rajini,
> >
> > I'd be happy to do that. I'll try to get it done in the next few days.
> >
> > Although there's been quite a lot of interest this, the vote thread never
> > got any binding +1, so it's been stuck in limbo for a long time. It would
> > be great to get this moving again.
> >
> > Kind regards,
> >
> > Tom
> >
> > On Tue, Apr 9, 2019 at 3:04 PM Rajini Sivaram 
> > wrote:
> >
> > > Hi Tom,
> > >
> > > Are you planning to extend this KIP to also include dynamic broker
> config
> > > update (currently covered under AlterConfigPolicy)?
> > >
> > > May be worth sending another note to make progress on this KIP since it
> > has
> > > been around a while and reading through the threads, it looks like
> there
> > > has been a lot of interest in it.
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
> > >
> > > On Wed, Jan 9, 2019 at 11:25 AM Tom Bentley 
> > wrote:
> > >
> > > > Hi Anna and Mickael,
> > > >
> > > > Anna, did you have any comments about the points I made?
> > > >
> > > > Mickael, we really need the vote to be passed before there's even any
> > > work
> > > > to do. With the exception of Ismael, the KIP didn't seem to get the
> > > > attention of any of the other committers.
> > > >
> > > > Kind regards,
> > > >
> > > > Tom
> > > >
> > > > On Thu, 13 Dec 2018 at 18:11, Tom Bentley 
> > wrote:
> > > >
> > > > > Hi Anna,
> > > > >
> > > > > Firstly, let me apologise again about having missed your previous
> > > emails
> > > > > about this.
> > > > >
> > > > > Thank you for the feedback. You raise some valid points about
> > > ambiguity.
> > > > > The problem with pulling the metadata into CreateTopicRequest and
> > > > > AlterTopicRequest is that you lose the benefit of being able to
> eaily
> > > > write
> > > > > a common policy across creation and alter cases. For example, with
> > the
> > > > > proposed design the policy maker could write code like this
> (forgive
> > my
> > > > > pseudo-Java)
> > > > >
> > > > > public void validateCreateTopic(requestMetadata, ...) {
> > > > > commonPolicy(requestMetadata.requestedState());
> > > > >   }
> > > > >
> > > > >   public void validateAlterTopic(requestMetadata, ...) {
> > > > > commonPolicy(requestMetadata.requestedState());
> > > > >   }
> > > > >
> > > > >   private void commonPolicy(RequestedTopicState requestedState) {
> > > > > // ...
> > > > >   }
> > > > >
> > > > > I think that's an important feature of the API because (I think)
> very
> > > > > often the policy maker is interested in defining the universe of
> > > > prohibited
> > > > > configurations without really caring about whether the request is a
> > > > create
> > > > > or an alter. Having a single RequestedTopicState for both create
> and
> > > > > alter means they can do that trivially in one place. Having
> different
> > > > > methods in the two Request classes prevents this and forces the
> > policy
> > > > > maker to pick apart the different requestState objects before
> calling
> > > any
> > > > > common method(s).
> > > > >
> > > > > I think my intention at the time (and it's many months ago now, so
> I
> > > > might
> > > > > not have remembered fully) was that RequestedTopicState would
> > basically
> > > > > represent what the topic would look like after the requested
> changes
> > > were
> > > > > applied

Re: Stream caching

2019-04-12 Thread Ярослав Когут
And I have a lot of warning that describe in this lira

https://issues.apache.org/jira/browse/KAFKA-5998?attachmentOrder=asc 


Thanks.

Kohut Yaroslav

> On Apr 12, 2019, at 10:59 AM, Ярослав Когут  wrote:
> 
> Hello, yes I know it, I read about rocks db.
> 
> But do we have ability to make some size limitation for this db? Because we 
> have alerts in grafana, that application is using more then 80% of full app 
> memory.
> 
> I’ll be very grateful for the help.
> 
> Thanks.
> 
> Kohut Yaroslav
> 
>> On Apr 12, 2019, at 4:02 AM, Guozhang Wang  wrote:
>> 
>> Hello Kohut,
>> 
>> Streams's default state.dir location is /tmp/kafka-streams, that may
>> explained it.
>> 
>> For more info about state management of Kafka Streams, you can read
>> https://kafka.apache.org/21/documentation/streams/architecture#streams_architecture_state
>> 
>> 
>> Guozhang
>> 
>> On Thu, Apr 11, 2019 at 1:09 AM Ярослав Когут 
>> wrote:
>> 
>>> Hi guys.
>>> 
>>> I’ve deployed spring cloud application using docker, service is using
>>> KStream functionality, and I have problems with /tmp folder.
>>> 
>>> On some instances of application /tmp folder is from 3 to 6 GB. Maybe I
>>> missed some property?
>>> 
>>> I will be very grateful for the help.
>>> 
>>> Thanks, best regards.
>>> 
>>> Kohut Yaroslav
>> 
>> 
>> 
>> -- 
>> -- Guozhang
> 



[DISCUSS] KIP-452: Tool to view cluster status

2019-04-12 Thread Łukasz Antoniak
Dear all,

I would like to start discussion of this KIP aimed at providing
command-line tool to display basic information about active Kafka brokers.

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-452%3A+Tool+to+view+cluster+status
JIRA: https://issues.apache.org/jira/browse/KAFKA-6393

Please take a look and share any thoughts you have regarding presented
information, API changes, etc.

Best regards,
Lukasz


Re: Proposal to Auto Close Inactive Tickets

2019-04-12 Thread Sönke Liebau
Hi Addison,

in general, I am totally in favor of closing unnecessary tickets! However,
over the past few weeks, I have spent quite a bit of time looking at old
tickets and evaluating whether those should be closed or are still
relevant. For a surprising number of these tickets, I've found that there
might actually still be something useful to do (the majority was easily
closeable though).

I totally agree that a cleanup makes sense, but I am a bit hesitant about
doing it automatically, even if no one feels responsible for the ticket
anymore, there may still be merit to it.

So personally I'd prefer a concerted cleanup effort to an automated
solution - but that is just my opinion :)

We have just discussed the jira workflow in the Apache Training project as
well and agreed on a workflow that has an initial "triage" state instead of
moving tickets directly to "open" - which should serve as an initial check
if the ticket is "valid" or something better suited to the mailing list,
not an issue, ...
Something similar might be an option to help keeping jira "clean" after an
initial cleanup effort.

Best regards,
Sönke

Am Do., 11. Apr. 2019, 20:55 hat Addison Huddy 
geschrieben:

> Hi Kafka Developers,
>
> The Apache Kafka JIRA currently has 2138 open JIRA tickets. As Charlie
> Munger  once said,
> “Simplicity has a way of improving performance through enabling us to
> better understand what we are doing.”
>
> What are everyone’s thoughts on adopting what the k8s community is doing
> and auto close any ticket that has not seen any updates for 90 days.
>
>
> https://github.com/kubernetes/community/blob/master/contributors/devel/automation.md
>
> Prow will close pull-requests that don't have human activity in the last 90 
> days. It will warn about this process 60 days before closing the 
> pull-request, and warn again 30 days later. One way to prevent this from 
> happening is to add the lifecycle/frozen label on the pull-request.
>
> If we were to adopt this practice, we could reduce our open ticket count
> to 553, a 74% decrease.
> project = KAFKA AND resolution = Unresolved AND updated >= "-90d" ORDER BY
> created DESC
>
> So how might this work?
>
>- a bot, let’s call it Bender, would ping the ticket reporter after 30
>days of inactivity
>- After 60 days, Bender would again ping the reporter warning them
>that the ticket will be closed due to inactivity
>- After 90 days of inactivity, bender would resolve the ticket with
>the status Auto Closed and a comment that the ticket was resolved due to
>inactivity.
>- Bender would ignore all tickets with the label bender-ignore
>
>
> [image: image.png]
>
> Let me know what you think?
>
> \ah
>
>


Re: Stream caching

2019-04-12 Thread Guozhang Wang
Hello Kohut,

Is your issue about disk usage, or memory? The first email seems to
indicate disks (there's no direct way for bounding the disk usage of state
stores, though you can set e.g. retention period to indirectly control disk
usage via specifying how long do you want to retain windowed stores), while
the later seems to indicate memory (btw maybe this can help a bit:
https://kafka.apache.org/documentation/streams/developer-guide/memory-mgmt.html
)


Guozhang

On Fri, Apr 12, 2019 at 2:49 AM Ярослав Когут 
wrote:

> And I have a lot of warning that describe in this lira
>
> https://issues.apache.org/jira/browse/KAFKA-5998?attachmentOrder=asc <
> https://issues.apache.org/jira/browse/KAFKA-5998?attachmentOrder=asc>
>
> Thanks.
>
> Kohut Yaroslav
>
> > On Apr 12, 2019, at 10:59 AM, Ярослав Когут 
> wrote:
> >
> > Hello, yes I know it, I read about rocks db.
> >
> > But do we have ability to make some size limitation for this db? Because
> we have alerts in grafana, that application is using more then 80% of full
> app memory.
> >
> > I’ll be very grateful for the help.
> >
> > Thanks.
> >
> > Kohut Yaroslav
> >
> >> On Apr 12, 2019, at 4:02 AM, Guozhang Wang  wrote:
> >>
> >> Hello Kohut,
> >>
> >> Streams's default state.dir location is /tmp/kafka-streams, that may
> >> explained it.
> >>
> >> For more info about state management of Kafka Streams, you can read
> >>
> https://kafka.apache.org/21/documentation/streams/architecture#streams_architecture_state
> >>
> >>
> >> Guozhang
> >>
> >> On Thu, Apr 11, 2019 at 1:09 AM Ярослав Когут 
> >> wrote:
> >>
> >>> Hi guys.
> >>>
> >>> I’ve deployed spring cloud application using docker, service is using
> >>> KStream functionality, and I have problems with /tmp folder.
> >>>
> >>> On some instances of application /tmp folder is from 3 to 6 GB. Maybe I
> >>> missed some property?
> >>>
> >>> I will be very grateful for the help.
> >>>
> >>> Thanks, best regards.
> >>>
> >>> Kohut Yaroslav
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >
>
>

-- 
-- Guozhang


Re: [VOTE] KIP-411: Make default Kafka Connect worker task client IDs distinct

2019-04-12 Thread Paul Davidson
Just a reminder that KIP-411 is open for voting. No votes received yet!

On Fri, Apr 5, 2019 at 9:07 AM Paul Davidson 
wrote:

> Hi all,
>
> Since we seem to have agreement in the discussion I would like to start
> the vote on KIP-411.
>
> See:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
>
> Also see the related PR: https://github.com/apache/kafka/pull/6097
>
> Thanks to everyone who contributed!
>
> Paul
>


Re: Stream caching

2019-04-12 Thread Ярослав Когут
Hello Guozhang!

I’m talking about disk usage, about /tmp/kafka-streams.

My application merges 3 topics, one of them has 120 partitions.

As i understood from 
documentation(https://docs.confluent.io/current/streams/sizing.html), disk 
usage depends on partition count. And I really want to decrease size of the tmp 
folder or try to control it.

Thank you.

Kohut Yaroslav

> On Apr 12, 2019, at 7:08 PM, Guozhang Wang  wrote:
> 
> Hello Kohut,
> 
> Is your issue about disk usage, or memory? The first email seems to
> indicate disks (there's no direct way for bounding the disk usage of state
> stores, though you can set e.g. retention period to indirectly control disk
> usage via specifying how long do you want to retain windowed stores), while
> the later seems to indicate memory (btw maybe this can help a bit:
> https://kafka.apache.org/documentation/streams/developer-guide/memory-mgmt.html
>  
> 
> )
> 
> 
> Guozhang
> 
> On Fri, Apr 12, 2019 at 2:49 AM Ярослав Когут  >
> wrote:
> 
>> And I have a lot of warning that describe in this lira
>> 
>> https://issues.apache.org/jira/browse/KAFKA-5998?attachmentOrder=asc 
>>  <
>> https://issues.apache.org/jira/browse/KAFKA-5998?attachmentOrder=asc 
>> >
>> 
>> Thanks.
>> 
>> Kohut Yaroslav
>> 
>>> On Apr 12, 2019, at 10:59 AM, Ярослав Когут 
>> wrote:
>>> 
>>> Hello, yes I know it, I read about rocks db.
>>> 
>>> But do we have ability to make some size limitation for this db? Because
>> we have alerts in grafana, that application is using more then 80% of full
>> app memory.
>>> 
>>> I’ll be very grateful for the help.
>>> 
>>> Thanks.
>>> 
>>> Kohut Yaroslav
>>> 
 On Apr 12, 2019, at 4:02 AM, Guozhang Wang  wrote:
 
 Hello Kohut,
 
 Streams's default state.dir location is /tmp/kafka-streams, that may
 explained it.
 
 For more info about state management of Kafka Streams, you can read
 
>> https://kafka.apache.org/21/documentation/streams/architecture#streams_architecture_state
 
 
 Guozhang
 
 On Thu, Apr 11, 2019 at 1:09 AM Ярослав Когут 
 wrote:
 
> Hi guys.
> 
> I’ve deployed spring cloud application using docker, service is using
> KStream functionality, and I have problems with /tmp folder.
> 
> On some instances of application /tmp folder is from 3 to 6 GB. Maybe I
> missed some property?
> 
> I will be very grateful for the help.
> 
> Thanks, best regards.
> 
> Kohut Yaroslav
 
 
 
 --
 -- Guozhang
>>> 
>> 
>> 
> 
> -- 
> -- Guozhang



Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-04-12 Thread Paul Davidson
Hi everyone. Just a reminder that KIP-411 is now open for voting. No votes
received yet!

Thanks,

Paul

On Thu, Apr 4, 2019 at 9:09 AM pdavidson  wrote:

> Thanks Randall.  You're absolutely right that Worker creates the clients
> before passing them to the tasks, so I'm very happy with your changes.
>
> Paul
>
> On Thu, Apr 4, 2019 at 8:02 AM Randall Hauch  wrote:
>
>> Sounds great.
>>
>> I did make a few minor grammatical edits to the "Proposed Changes" section
>> to avoid the notion that the sink and source tasks create the consumers
>> and
>> producers, respectively. I think it's important to accurately denote that
>> the framework creates the producers and consumers for the tasks. (This in
>> no way changes the proposal at all, and feel free to roll back if you
>> disagree with the changes. I felt it was easier to change than to
>> explain.)
>>
>> Looking forward to a vote.
>>
>> Best regards,
>>
>> Randall
>>
>> On Wed, Apr 3, 2019 at 6:49 PM pdavidson > .invalid>
>> wrote:
>>
>> > Thanks Randall, I updated the proposal as suggested. Let me know if any
>> > other changes need to be made, otherwise I think the KIP-411 proposal is
>> > ready to finalize.  I will aim to call a vote on Friday.
>> >
>> > On Mon, Mar 25, 2019 at 7:12 AM Ryanne Dolan 
>> > wrote:
>> >
>> > > Randall, Paul, the proposal looks great, thanks.
>> > >
>> > > Ryanne
>> > >
>> > > On Mon, Mar 25, 2019, 9:03 AM Randall Hauch  wrote:
>> > >
>> > > > Paul,
>> > > >
>> > > > Thanks for updating the KIP with the proposal. I do think the KIP
>> > should
>> > > at
>> > > > least mention that the prior behavior is to allow the worker to
>> > override
>> > > > the `producer.client.id` or `consumer.client.id`, which is entirely
>> > > > possible (though unlikely since there would be an MBean conflict, as
>> > > > pointed out in the discussion). It might be sufficient to just add a
>> > > > sentence to the "Compatibility, Deprecation, and Migration Plan"
>> > section,
>> > > > like "Any client IDs specified in the worker configuration via `
>> > > > producer.client.id` or `consumer.client.id` properties will be
>> > > unchanged,
>> > > > as those will take precedence." Thoughts?
>> > > >
>> > > > Ryanne,
>> > > >
>> > > > IIUC your last message, I think the latest KIP proposal will align
>> > pretty
>> > > > closely with your suggestion. Can you review and confirm?
>> > > >
>> > > > Best regards,
>> > > >
>> > > > Randall
>> > > >
>> > > > On Fri, Mar 1, 2019 at 3:04 PM Ryanne Dolan 
>> > > wrote:
>> > > >
>> > > > > Paul, Randall, I don't think most people will care to exercise so
>> > much
>> > > > > control over the client IDs, so long as they are filled in
>> > > automatically
>> > > > in
>> > > > > a way that eliminates duplicate metrics and remains somewhat
>> legible.
>> > > If
>> > > > we
>> > > > > let the user specify a pattern or something, we're really just
>> making
>> > > the
>> > > > > user worry about these requirements.
>> > > > >
>> > > > > For example, if they specify "foo" as the client.id, they'll get
>> a
>> > > bunch
>> > > > > of
>> > > > > exceptions about that MBean already existing. So they'll try
>> > > > > "${connectorName}-foo", which won't work because connectors that
>> get
>> > > > > restarted will re-use the same client ID and the same MBean again.
>> > And
>> > > so
>> > > > > on, until they end up solving the same problem we are trying to
>> solve
>> > > > here.
>> > > > >
>> > > > > I think you at least need something like
>> > > "connect--producer-dlq"
>> > > > to
>> > > > > avoid MBeans being re-registered within the same JVM. I believe
>> the
>> > > task
>> > > > ID
>> > > > > is based on the connector name, so you'd get e.g.
>> > > > > "connect-myconnector-1-producer".
>> > > > >
>> > > > > Ryanne
>> > > > >
>> > > > >
>> > > > > On Fri, Mar 1, 2019 at 12:44 PM Paul Davidson
>> > > > >  wrote:
>> > > > >
>> > > > > > Thanks Randall.  I like your suggestion: as you say, this would
>> > make
>> > > it
>> > > > > > possible to usefully override the default client id properties.
>> > > > > >
>> > > > > > I'm not sure how we would handle the dead-letter queue case
>> though
>> > -
>> > > > > maybe
>> > > > > > we could automatically add a "dlq-" prefix to the producer
>> client
>> > id?
>> > > > > >
>> > > > > > If there is agreement on this change I will update the KIP and
>> the
>> > PR
>> > > > > (when
>> > > > > > I find some time).
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch > >
>> > > > wrote:
>> > > > > >
>> > > > > > > Hi, Paul. Thanks for the update to KIP-411 to reflect adding
>> > > > defaults,
>> > > > > > and
>> > > > > > > creating/updating https://github.com/apache/kafka/pull/6097
>> to
>> > > > reflect
>> > > > > > > this
>> > > > > > > approach.
>> > > > > > >
>> > > > > > > Now that we've avoided adding a new config and have changed
>> the
>> > > > > default `
>> > > > > > > client.id` to include some context, the connector name, and
>> task
>> > 

Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-12 Thread Guozhang Wang
On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman 
wrote:

> Thanks for the comments Guozhang! I've answered your questions below
>
> On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang  wrote:
>
> > Hi Sophie,
> >
> > Thanks for the proposed KIP. I've made a pass over it and here are some
> > thoughts:
> >
> > 1. "The window size is effectively the grace and retention period". The
> > grace time is defined as "the time to admit late-arriving events after
> the
> > end of the window." hence it is the additional time beyond the window
> size.
> > I guess your were trying to say it should be zero?
> >
> > Also for retention period, it is not a notion of the window spec any
> more,
> > but only for the window store itself. So I'd suggest talking about window
> > size here, and note that store retention time cannot be controlled via
> > window spec at all.
> >
>
> Yes, I meant to say the grace period is effectively zero -- the retention
> period will ultimately be the same as the window size, which is
> configurable, but it can't be configured independently if that's what you
> mean?
>
>
You can confiture retention via Materialized (in DSL), when specifying the
store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The
point here is that they are specified independently from the Windows spec.
So a user cannot control how long a materialized store can be retained from
the window spec itself, she must do that via the mentioned methods before.


>
> > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> > bucket, so I'd assume you will expire one bucket as a whole when its end
> > time is smaller than the current window's starting time, right?
> >
>
> Since this design assumes we don't have a subtracter, each bucket would
> expire when it's start time is outside the current window; the remaining
> values in that bucket are then aggregated with the "running aggregate" of
> the next bucket to get the total aggregate for the entire window. I'll try
> to come up with a diagram and/or better way to explain what I have in mind
> here...
> (The values themselves in the buckets will expire automatically by setting
> the retention period of the underlying window store)
>
>
> > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > configurable parameter exposed to users or is it abstracted away and only
> > being selected internally?
> >
>
> Good question. If we ignore the difference in cost between aggregation
> operations and writes to the underlying store, the optimal value of M is
> sqrt(N). But the reality is the aggregation might be very simple vs
> expensive RocksDB writes -- conversely the aggregation itself could be
> complicated/costly while the underlying store is cheap to write  (ie
> in-memory). I do feel it should be abstracted away from the user however
> and not an additional parameter they need to consider and tune (eg
> segmentInterval) ... some profiling would probably be needed to determine a
> reasonable choice
>
>
> > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > paragraph?
> >
>
> Whoops
>
>
> > 5. Meta comment: for many aggregations it is commutative and associative
> so
> > we can require users to pass in a "substract" function as well. Given
> these
> > two function I think we can propose two set of APIs, 1) with the adder
> and
> > subtractor and 2) with the added only (if the aggregate logic is not
> comm.
> > and assoc.).
> >
> > We just maintain an aggregate value for each bucket (call it
> > bucket_aggregate) plus for the whole window (call it total_aggregate),
> i.e.
> > at most M + 1 values per key. We use the total_aggregate for queries, and
> > each update will cause 2 writes (to the bucket and to the total
> aggregate).
> >
> > And with 1) when expiring the oldest bucket we simply call
> > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > oldest bucket we can re-compute the total_aggregate by
> > sum(bucket_aggregate) over other buckets again.
> >
>
> This is a good point, ie we can definitely be much smarter in our design if
> we have a subtracter, in which case it's probably worth separate sets of
> APIs/implementations based on what the user can provide. I'll work this
> into the KIP
>
>
> > 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> > data is not very common, hence most of the updates will be falling into
> the
> > latest bucket. So I'm wondering if it makes sense to always store the
> first
> > bucket in memory while making other buckets optionally on persistent
> > storage. In practice, as long as M is large enough (we probably need it
> to
> > be large enough to have sufficiently sensitive expiration anyways) then
> > each bucket's aggregate data is small enough to be in memory.
> >
>
> This sounds reasonable to me (looking into the future, if we want to
> eventually support a way to "tune" the total memory usage by Streams this
> could be tur

Re: [VOTE] KIP-417: Allow JmxTool to connect to a secured RMI port

2019-04-12 Thread lisiyuan
+1 (binding).

Thanks.

Li 


On 2019/01/29 01:37:43, "Fangbin Sun"  wrote: 
> Hi, All:> 
> I would like to start a vote on KIP-417 which aims at supporting JmxTool to 
> connect to a secured RMI port.> 
> 
> 
> The KIP: > 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-417%3A+Allow+JmxTool+to+connect+to+a+secured+RMI+port>
>  
> 
> 
> Thanks!> 
> Fangbin> 



Re: [VOTE] KIP-417: Allow JmxTool to connect to a secured RMI port

2019-04-12 Thread lisiyuan
+1 (binding).

Thanks.

Li 

On 2019/02/01 16:20:11, Harsha Chintalapani  wrote: 
> +1 (binding).> 
> 
> Thanks,> 
> Harsha> 
> On Jan 31, 2019, 8:08 PM -0800, Manikumar , wrote:> 
> > Hi,> 
> >> 
> > +1 (binding). Thanks for the KIP.> 
> >> 
> > On Mon, Jan 28, 2019 at 5:37 PM Fangbin Sun  wrote:> 
> >> 
> > > Hi, All:> 
> > > I would like to start a vote on KIP-417 which aims at supporting JmxTool> 
> > > to connect to a secured RMI port.> 
> > >> 
> > >> 
> > > The KIP:> 
> > >> 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-417%3A+Allow+JmxTool+to+connect+to+a+secured+RMI+port>
> > >  
> > >> 
> > >> 
> > > Thanks!> 
> > > Fangbin> 
> 



[jira] [Created] (KAFKA-8227) Missing Links in Duality of Streams and Tables section

2019-04-12 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-8227:
--

 Summary: Missing Links in Duality of Streams and Tables section
 Key: KAFKA-8227
 URL: https://issues.apache.org/jira/browse/KAFKA-8227
 Project: Kafka
  Issue Type: Bug
  Components: documentation, streams
Reporter: Bill Bejeck


[https://kafka.apache.org/21/documentation/streams/core-concepts#streams-concepts-duality]
 section there several sections that are emphasized but should be links
 # streams 
 # tables 
 # elastic 
 # fault-tolerant stateful processing 

 # interactive queries 
 # aggregations 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-411: Add option to make Kafka Connect task client ID values unique

2019-04-12 Thread Ryanne Dolan
Thanks for the reminder.

+1 (non-binding)

Ryanne

On Fri, Apr 12, 2019, 11:13 AM Paul Davidson
 wrote:

> Hi everyone. Just a reminder that KIP-411 is now open for voting. No votes
> received yet!
>
> Thanks,
>
> Paul
>
> On Thu, Apr 4, 2019 at 9:09 AM pdavidson  wrote:
>
> > Thanks Randall.  You're absolutely right that Worker creates the clients
> > before passing them to the tasks, so I'm very happy with your changes.
> >
> > Paul
> >
> > On Thu, Apr 4, 2019 at 8:02 AM Randall Hauch  wrote:
> >
> >> Sounds great.
> >>
> >> I did make a few minor grammatical edits to the "Proposed Changes"
> section
> >> to avoid the notion that the sink and source tasks create the consumers
> >> and
> >> producers, respectively. I think it's important to accurately denote
> that
> >> the framework creates the producers and consumers for the tasks. (This
> in
> >> no way changes the proposal at all, and feel free to roll back if you
> >> disagree with the changes. I felt it was easier to change than to
> >> explain.)
> >>
> >> Looking forward to a vote.
> >>
> >> Best regards,
> >>
> >> Randall
> >>
> >> On Wed, Apr 3, 2019 at 6:49 PM pdavidson  >> .invalid>
> >> wrote:
> >>
> >> > Thanks Randall, I updated the proposal as suggested. Let me know if
> any
> >> > other changes need to be made, otherwise I think the KIP-411 proposal
> is
> >> > ready to finalize.  I will aim to call a vote on Friday.
> >> >
> >> > On Mon, Mar 25, 2019 at 7:12 AM Ryanne Dolan 
> >> > wrote:
> >> >
> >> > > Randall, Paul, the proposal looks great, thanks.
> >> > >
> >> > > Ryanne
> >> > >
> >> > > On Mon, Mar 25, 2019, 9:03 AM Randall Hauch 
> wrote:
> >> > >
> >> > > > Paul,
> >> > > >
> >> > > > Thanks for updating the KIP with the proposal. I do think the KIP
> >> > should
> >> > > at
> >> > > > least mention that the prior behavior is to allow the worker to
> >> > override
> >> > > > the `producer.client.id` or `consumer.client.id`, which is
> entirely
> >> > > > possible (though unlikely since there would be an MBean conflict,
> as
> >> > > > pointed out in the discussion). It might be sufficient to just
> add a
> >> > > > sentence to the "Compatibility, Deprecation, and Migration Plan"
> >> > section,
> >> > > > like "Any client IDs specified in the worker configuration via `
> >> > > > producer.client.id` or `consumer.client.id` properties will be
> >> > > unchanged,
> >> > > > as those will take precedence." Thoughts?
> >> > > >
> >> > > > Ryanne,
> >> > > >
> >> > > > IIUC your last message, I think the latest KIP proposal will align
> >> > pretty
> >> > > > closely with your suggestion. Can you review and confirm?
> >> > > >
> >> > > > Best regards,
> >> > > >
> >> > > > Randall
> >> > > >
> >> > > > On Fri, Mar 1, 2019 at 3:04 PM Ryanne Dolan <
> ryannedo...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Paul, Randall, I don't think most people will care to exercise
> so
> >> > much
> >> > > > > control over the client IDs, so long as they are filled in
> >> > > automatically
> >> > > > in
> >> > > > > a way that eliminates duplicate metrics and remains somewhat
> >> legible.
> >> > > If
> >> > > > we
> >> > > > > let the user specify a pattern or something, we're really just
> >> making
> >> > > the
> >> > > > > user worry about these requirements.
> >> > > > >
> >> > > > > For example, if they specify "foo" as the client.id, they'll
> get
> >> a
> >> > > bunch
> >> > > > > of
> >> > > > > exceptions about that MBean already existing. So they'll try
> >> > > > > "${connectorName}-foo", which won't work because connectors that
> >> get
> >> > > > > restarted will re-use the same client ID and the same MBean
> again.
> >> > And
> >> > > so
> >> > > > > on, until they end up solving the same problem we are trying to
> >> solve
> >> > > > here.
> >> > > > >
> >> > > > > I think you at least need something like
> >> > > "connect--producer-dlq"
> >> > > > to
> >> > > > > avoid MBeans being re-registered within the same JVM. I believe
> >> the
> >> > > task
> >> > > > ID
> >> > > > > is based on the connector name, so you'd get e.g.
> >> > > > > "connect-myconnector-1-producer".
> >> > > > >
> >> > > > > Ryanne
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Mar 1, 2019 at 12:44 PM Paul Davidson
> >> > > > >  wrote:
> >> > > > >
> >> > > > > > Thanks Randall.  I like your suggestion: as you say, this
> would
> >> > make
> >> > > it
> >> > > > > > possible to usefully override the default client id
> properties.
> >> > > > > >
> >> > > > > > I'm not sure how we would handle the dead-letter queue case
> >> though
> >> > -
> >> > > > > maybe
> >> > > > > > we could automatically add a "dlq-" prefix to the producer
> >> client
> >> > id?
> >> > > > > >
> >> > > > > > If there is agreement on this change I will update the KIP and
> >> the
> >> > PR
> >> > > > > (when
> >> > > > > > I find some time).
> >> > > > > >
> >> > > > > >
> >> > > > > > On Thu, Feb 21, 2019 at 8:12 AM Randall Hauch <
> rha...@gmail.com
> >> >
> >> > > > wrote:
> 

Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-12 Thread Ying Zheng
Hi Ismael,

Those are just examples. I think the administrators should be able to block
certain client libraries for whatever reason. Some other possible reasons
include, force users to check pointing in Kafka instead of zookeeper,
forbid an old go (sarama) client library which is known to have some
serious bugs.

message.downconversion.enable does not solve our problems. We are now
planning to upgrade to message format V3, and force users to upgrade to
Kafka 1.x clients. With the proposed min.api.version setting, in case of
there is anything wrong, we can roll back the setting. If we upgrade the
file format, there is no way to rollback (Kafka doesn't support downgrading
message format).

On Thu, Apr 11, 2019 at 7:05 PM Ismael Juma  wrote:

> Hi Ying,
>
> It looks to me that all the examples given in the KIP can be handled with
> the existing "message.downconversion.enable" config and by configuring the
> message format to be the latest:
>
> 1. Kafka 8 / 9 / 10 consumer hangs when the message contains message header
> > ( KAFKA-6739 - Down-conversion fails for records with headers RESOLVED  )
> > 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 ( KAFKA-3160 -
> > Kafka LZ4 framing code miscalculates header checksum RESOLVED  )
> > 3. Performance penalty of converting message format from V3 to V1 or V2
> > for the old consumers (KIP-31 - Move to relative offsets in compressed
> > message sets)
>
>
> Am I missing something? Are there other examples that are not related to
> message conversion?
>
> Ismael
>
> On Thu, Apr 11, 2019 at 11:53 PM Ying Zheng 
> wrote:
>
> > Hi here,
> >
> > Please vote for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
> >
> > Thank you!
> >
>


Re: [DISCUSS] KIP-449: Add connector contexts to Connect worker logs

2019-04-12 Thread Konstantine Karantasis
Thanks for the KIP Randall.

It might not be obvious right away, but this is a great improvement when
running Connect with multiple connectors or when debugging Connect, for
instance in integration tests or system tests. KIP looks good to me
overall, I have just a few comments below:

1. In the snippet of the config, by including an uncommented line, maybe
it's not immediately clear that this line is an existing line in
connect-log4j.properties and not an addition. Should this be mentioned in a
separate code block or in a different way?

2. Currently when adding the taskId to a connector or connector task, we
precede it with a dash (-). I feel this addition would make it easier to
parse the taskId visually as well as with parsing tools (for example I can
use `-` as a delimiter easier than 'k' or even a multi-character delimiter
such as 'task'). Do you think it's an overhead?

3. I think a specific mention to the convention used on the base-index for
taskIds would be useful. For example, taskId equal to zero in most cases
represents the (source/sink) connector, while taskId > 0 corresponds to
(source/sink) tasks. By reading the KIP, I assume that task0 (or task-0 if
you agree with my previous comment) will be the connector. Is this the case?

4. Should the example include a snippet for taskId > 0?

5. Do you intend to enable connector MDC in tests? In this case how would
you distinguish between multiple workers in integrations tests? Do you
intend to address this here, in the PR corresponding to this KIP or follow
up later?

6. Related to my previous comment (5). The property 'client.id' is already
used to identify a Connect worker. Have you thought of offering the ability
to identify the worker MDC if this property is set, keeping at the same
time with MDC the ability to not include it as identifier?

Finally, I'd suggest removing text placeholders. I think it will be easier
to follow the KIP. Currently I see placeholder text below Contents and in
Rejected alternatives.

And my least favorite section, but I can't help it :)
Typos/grammar:
to understand what (is) happening within the worker,
Kafka Connect use(s) ...
quotes maybe not needed, text already verbatim in
`config/connect-log4j.properties`

Best,
Konstantine

On Tue, Apr 2, 2019 at 4:20 PM Randall Hauch  wrote:

> I've been working on https://github.com/apache/kafka/pull/5743 for a
> while,
> but there were a number of comment, suggestions, and mild concerns on the
> PR. One of those comments was that maybe changing the Connect log content
> in this way probably warrants a KIP. So here it is:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs
>
> I've also updated my PR reflect the KIP. Please reply with comments and/or
> feedback.
>
> Best regards,
>
> Randall
>


Re: [DISCUSS] KIP-437: Custom replacement for MaskField SMT

2019-04-12 Thread Randall Hauch
Thanks for the proposal and the PR request, Valeria.

I've been working with Valeria on this PR, so it's not surprising I'm in
favor of this KIP and improvement so that users can easily mask values with
specific replacement values with the same primitive type.

This proposal has been open for over a month, and it's been almost that
long without other comments. Unless there are other objections, I would say
this KIP is ready for a vote.

Best regards,

Randall

On Thu, Mar 28, 2019 at 9:26 AM Valeria Vasylieva <
valeria.vasyli...@gmail.com> wrote:

> вт, 19 мар. 2019 г. в 17:59, Valeria Vasylieva <
> valeria.vasyli...@gmail.com
> >:
>
> > Hi Adam,
> >
> > Thank you for your response!
> > Dear community members, do you have any other thoughts on this KIP? Would
> > be great if you share them!
> >
> > Regards,
> >
> > Valeria
> >
> > сб, 16 мар. 2019 г. в 18:16, Adam Bellemare :
> >
> >> Hi Valeria
> >>
> >> I am thinking that a full map function via configuration is very
> unlikely
> >> to be feasible. At that point, it would be best for the user to create
> >> their own custom transformation.
> >>
> >> I think that since your function is indeed just an extension of masking
> >> that it is reasonable as presented. I don't have any other concerns with
> >> the proposal, but it would be good to hear from others.
> >>
> >> Thanks
> >>
> >>
> >> On Fri, Mar 15, 2019 at 11:38 AM Valeria Vasylieva <
> >> valeria.vasyli...@gmail.com> wrote:
> >>
> >> > Hi Adam,
> >> >
> >> > Thank you for your interest. Here is the list of currently supported
> >> > transformations in Connect:
> >> > https://kafka.apache.org/documentation/#connect_transforms.
> >> > As I can see, there is no "map" transformation in this list and all
> >> other
> >> > SMTs do not support functionality described in a KIP.
> >> > I cannot find the way to achieve the same result using existing
> >> > transformations.
> >> > The request, described in an issue was just to add this custom masking
> >> > functionality to the MaskField SMT, but if there is a need we can
> evolve
> >> > this issue and create separate "map" transformation,
> >> > it may be more useful but will require more effort, so it is better to
> >> do
> >> > it as separate issue.
> >> >
> >> > Kind Regards,
> >> > Valeria
> >> >
> >> > пт, 15 мар. 2019 г. в 17:35, Adam Bellemare  >:
> >> >
> >> > > Hi Valeria
> >> > >
> >> > > Thanks for the KIP. I admit my knowledge on Kafka Connect transforms
> >> is a
> >> > > bit rusty, however - Is there any other way to currently achieve
> this
> >> > same
> >> > > functionality outlined in your KIP using existing transforms?
> >> > >
> >> > >
> >> > > Thanks
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Mar 14, 2019 at 12:05 PM Valeria Vasylieva <
> >> > > valeria.vasyli...@gmail.com> wrote:
> >> > >
> >> > > > Dear community members,
> >> > > >
> >> > > > I would be very grateful if you leave any feedback on this KIP. It
> >> will
> >> > > > help me to understand if change is useful or not and to decide on
> >> > further
> >> > > > actions.
> >> > > >
> >> > > > Thank you in advance,
> >> > > > Valeria
> >> > > >
> >> > > > пн, 11 мар. 2019 г. в 13:20, Valeria Vasylieva <
> >> > > > valeria.vasyli...@gmail.com
> >> > > > >:
> >> > > >
> >> > > > > Hi All,
> >> > > > >
> >> > > > > I would like to start a discussion about adding new
> functionality
> >> to
> >> > > > > MaskField SMT. The existing implementation allows to mask out
> any
> >> > field
> >> > > > > value with the null equivalent of the field type.
> >> > > > >
> >> > > > > I suggest to add a possibility to provide a literal replacement
> >> for
> >> > the
> >> > > > > field. This way you can mask out any PII info (IP, SSN etc.)
> with
> >> any
> >> > > > > custom replacement.
> >> > > > >
> >> > > > > It is a short KIP which does not require major changes, but
> could
> >> > help
> >> > > to
> >> > > > > make this transformation more useful for the client.
> >> > > > >
> >> > > > > The KIP is here:
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-437%3A+Custom+replacement+for+MaskField+SMT
> >> > > > >
> >> > > > > I would be glad to receive any feedback on this KIP.
> >> > > > >
> >> > > > > Kind Regards,
> >> > > > > Valeria
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>


Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-12 Thread Ismael Juma
Hi Ying,

The actual reasons are important so that people can evaluate the KIP (and
vote). :) Thanks for providing a few more:

(1) force users to check pointing in Kafka instead of zookeeper
(2) forbid an old go (sarama) client library which is known to have some
serious bugs
(3) force kafka 1.x clients with the ability to roll back if there's an
issue (unlike a message format upgrade)

Relying on min.version seems like a pretty clunky way to achieve the above
list. The challenge is that it's pretty difficult to do it in a way that
works for clients across languages. They each add support for new protocol
versions independently (it could even happen in a bug fix release). So, if
you tried to block Sarama in #2, you may block Java clients too.

For #3, it seems simplest to have a config that requires clients to support
a given message format version (or higher). For #2, it seems like you'd
want clients to advertise their versions. That would be useful for multiple
reasons.

Ismael

On Fri, Apr 12, 2019 at 8:42 PM Ying Zheng  wrote:

> Hi Ismael,
>
> Those are just examples. I think the administrators should be able to block
> certain client libraries for whatever reason. Some other possible reasons
> include, force users to check pointing in Kafka instead of zookeeper,
> forbid an old go (sarama) client library which is known to have some
> serious bugs.
>
> message.downconversion.enable does not solve our problems. We are now
> planning to upgrade to message format V3, and force users to upgrade to
> Kafka 1.x clients. With the proposed min.api.version setting, in case of
> there is anything wrong, we can roll back the setting. If we upgrade the
> file format, there is no way to rollback (Kafka doesn't support downgrading
> message format).
>
> On Thu, Apr 11, 2019 at 7:05 PM Ismael Juma  wrote:
>
> > Hi Ying,
> >
> > It looks to me that all the examples given in the KIP can be handled with
> > the existing "message.downconversion.enable" config and by configuring
> the
> > message format to be the latest:
> >
> > 1. Kafka 8 / 9 / 10 consumer hangs when the message contains message
> header
> > > ( KAFKA-6739 - Down-conversion fails for records with headers
> RESOLVED  )
> > > 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 ( KAFKA-3160 -
> > > Kafka LZ4 framing code miscalculates header checksum RESOLVED  )
> > > 3. Performance penalty of converting message format from V3 to V1 or V2
> > > for the old consumers (KIP-31 - Move to relative offsets in compressed
> > > message sets)
> >
> >
> > Am I missing something? Are there other examples that are not related to
> > message conversion?
> >
> > Ismael
> >
> > On Thu, Apr 11, 2019 at 11:53 PM Ying Zheng 
> > wrote:
> >
> > > Hi here,
> > >
> > > Please vote for
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
> > >
> > > Thank you!
> > >
> >
>


Re: [DISCUSS] KIP-450: Sliding Window Aggregations in the DSL

2019-04-12 Thread Bill Bejeck
Thanks for the KIP Sophie.

I have a couple of additional comments.

The current proposal only considers stream-time.

While I support this, each time we introduce a new operation based on
stream-time, invariably users request that operation support wall-clock
time as well.  Would we want to consider this option in the current KIP
proactively?

Also, I think the concept of 0 grace period is too restrictive as well.  I
may have missed your response, but can you elaborate on the reasoning?

Thanks,
Bill


On Fri, Apr 12, 2019 at 12:14 PM Guozhang Wang  wrote:

> On Thu, Apr 11, 2019 at 2:10 PM Sophie Blee-Goldman 
> wrote:
>
> > Thanks for the comments Guozhang! I've answered your questions below
> >
> > On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang  wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for the proposed KIP. I've made a pass over it and here are some
> > > thoughts:
> > >
> > > 1. "The window size is effectively the grace and retention period". The
> > > grace time is defined as "the time to admit late-arriving events after
> > the
> > > end of the window." hence it is the additional time beyond the window
> > size.
> > > I guess your were trying to say it should be zero?
> > >
> > > Also for retention period, it is not a notion of the window spec any
> > more,
> > > but only for the window store itself. So I'd suggest talking about
> window
> > > size here, and note that store retention time cannot be controlled via
> > > window spec at all.
> > >
> >
> > Yes, I meant to say the grace period is effectively zero -- the retention
> > period will ultimately be the same as the window size, which is
> > configurable, but it can't be configured independently if that's what you
> > mean?
> >
> >
> You can confiture retention via Materialized (in DSL), when specifying the
> store in KTable, or via WindowStoreBuilder#retentionPeriod (in PAPI). The
> point here is that they are specified independently from the Windows spec.
> So a user cannot control how long a materialized store can be retained from
> the window spec itself, she must do that via the mentioned methods before.
>
>
> >
> > > 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire
> a
> > > bucket, so I'd assume you will expire one bucket as a whole when its
> end
> > > time is smaller than the current window's starting time, right?
> > >
> >
> > Since this design assumes we don't have a subtracter, each bucket would
> > expire when it's start time is outside the current window; the remaining
> > values in that bucket are then aggregated with the "running aggregate" of
> > the next bucket to get the total aggregate for the entire window. I'll
> try
> > to come up with a diagram and/or better way to explain what I have in
> mind
> > here...
> > (The values themselves in the buckets will expire automatically by
> setting
> > the retention period of the underlying window store)
> >
> >
> > > 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> > > configurable parameter exposed to users or is it abstracted away and
> only
> > > being selected internally?
> > >
> >
> > Good question. If we ignore the difference in cost between aggregation
> > operations and writes to the underlying store, the optimal value of M is
> > sqrt(N). But the reality is the aggregation might be very simple vs
> > expensive RocksDB writes -- conversely the aggregation itself could be
> > complicated/costly while the underlying store is cheap to write  (ie
> > in-memory). I do feel it should be abstracted away from the user however
> > and not an additional parameter they need to consider and tune (eg
> > segmentInterval) ... some profiling would probably be needed to
> determine a
> > reasonable choice
> >
> >
> > > 4. "There is some tradeoff between purely optimizing " seems incomplete
> > > paragraph?
> > >
> >
> > Whoops
> >
> >
> > > 5. Meta comment: for many aggregations it is commutative and
> associative
> > so
> > > we can require users to pass in a "substract" function as well. Given
> > these
> > > two function I think we can propose two set of APIs, 1) with the adder
> > and
> > > subtractor and 2) with the added only (if the aggregate logic is not
> > comm.
> > > and assoc.).
> > >
> > > We just maintain an aggregate value for each bucket (call it
> > > bucket_aggregate) plus for the whole window (call it total_aggregate),
> > i.e.
> > > at most M + 1 values per key. We use the total_aggregate for queries,
> and
> > > each update will cause 2 writes (to the bucket and to the total
> > aggregate).
> > >
> > > And with 1) when expiring the oldest bucket we simply call
> > > subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> > > oldest bucket we can re-compute the total_aggregate by
> > > sum(bucket_aggregate) over other buckets again.
> > >
> >
> > This is a good point, ie we can definitely be much smarter in our design
> if
> > we have a subtracter, in which case it's probably worth separate sets of
> > AP

[VOTE] KIP-431: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2019-04-12 Thread Mateusz Zakarczemny
Hi All,
This KIP is in discussion for more than a month. The feedback is positive
without any objection comments. Therefore, I would like to start voting.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter

Regards,
Mateusz Zakarczemny


Re: [DISCUSS] KIP-446: Add changelog topic configuration to KTable suppress

2019-04-12 Thread Bruno Cadonna
Matthias,

Are you sure the users are aware that with `withLoggingDisabled()`, they
might lose data during failover?

OK, we maybe do not necessarily need a WARN log. However, I would at least
add a comment like in `StoreBuilder`,ie,

/**
* Disable the changelog for store built by this {@link StoreBuilder}.
* This will turn off fault-tolerance for your store.
* By default the changelog is enabled.
* @return this
*/
StoreBuilder withLoggingDisabled();

What do you think?

Best,
Bruno

On Thu, Apr 11, 2019 at 12:04 AM Matthias J. Sax 
wrote:

> I think that the current proposal to add `withLoggingDisabled()` and
> `withLoggingEnabled(Map)` should be the best option.
>
> IMHO there is no reason to add a WARN log. We also don't have a WARN log
> when people disable logging on regular stores. As Bruno mentioned, this
> might also lead to data loss, so I don't see why we should treat
> suppress() different to other stores.
>
>
> -Matthias
>
> On 4/10/19 3:36 PM, Bruno Cadonna wrote:
> > Hi Marteen and John,
> >
> > I would opt for option 1 with an additional log message on INFO or WARN
> > level, since the log file is the place where you would look first to
> > understand what went wrong. I would also not adjust it when persistence
> > stores are available for suppress.
> >
> > I would not go for option 2 or 3, because IIUC, with
> > `withLoggingDisabled()` also persistent state stores do not guarantee not
> > to loose records. Persisting state stores is basically a way to optimize
> > recovery in certain cases. The changelog topic is the component that
> > guarantees no data loss. So regarding data loss, in my opinion, disabling
> > logging on the suppression buffer is not different from disabling logging
> > on other state stores. Please correct me if I am wrong.
> >
> > Best,
> > Bruno
> >
> > On Wed, Apr 10, 2019 at 12:12 PM John Roesler  wrote:
> >
> >> Thanks for the update and comments, Maarten. It would be interesting to
> >> hear what others think as well.
> >> -John
> >>
> >> On Thu, Apr 4, 2019 at 2:43 PM Maarten Duijn 
> wrote:
> >>
> >>> Thank you for the explanation regarding the internals, I have edited
> the
> >>> KIP accordingly and updated the Javadoc. About the possible data loss
> >> when
> >>> altering changelog config, I think we can improve by doing (one of) the
> >>> following.
> >>>
> >>> 1) Add a warning in the comments that clearly states what might happen
> >>> when change logging is disabled and adjust it when persistent stores
> are
> >>> added.
> >>>
> >>> 2) Change `withLoggingDisabled` to `minimizeLogging`. Instead of
> >> disabling
> >>> logging, a call to this method minimizes the topic size by aggressively
> >>> removing the records emitted downstream by the suppress operator. I
> >> believe
> >>> this can be achieved by setting `delete.retention.ms=0` in the topic
> >>> config.
> >>>
> >>> 3) Remove `withLoggingDisabled` from the proposal.
> >>>
> >>> 4) Leave both methods as-proposed, as you indicated, this is in line
> with
> >>> the other parts of the Streams API
> >>>
> >>> A user might want to disable logging when downstream is not a Kafka
> topic
> >>> but some other service that does not benefit from atleast-once-delivery
> >> of
> >>> the suppressed records in case of failover or rebalance.
> >>> Seeing as it might cause data loss, the methods should not be used
> >> lightly
> >>> and I think some comments are warranted. Personally, I rely purely on
> >> Kafka
> >>> to prevent data loss even when a store persisted locally, so when
> support
> >>> is added for persistent suppression, I feel the comments may stay.
> >>>
> >>> Maarten
> >>>
> >>
> >
>
>


Re: [VOTE] KIP-431: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2019-04-12 Thread Ismael Juma
Hi Mateusz,

The KIP looks good. Just a few of questions/suggestions:

1. It would be helpful to see an example of the output with everything
enabled.
2. What are the default values for the properties (eg what's the default
header separator).
3. What is the separator used between key/value and the new fields?

Ismael

On Fri, Apr 12, 2019 at 9:43 PM Mateusz Zakarczemny 
wrote:

> Hi All,
> This KIP is in discussion for more than a month. The feedback is positive
> without any objection comments. Therefore, I would like to start voting.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter
>
> Regards,
> Mateusz Zakarczemny
>


Re: [DISCUSS] KIP-446: Add changelog topic configuration to KTable suppress

2019-04-12 Thread Bill Bejeck
Thanks for the KIP Maarten.

I also agree that keeping the `withLoggingDisabled()` and
`withLoggingEnabled(Map)` methods is the better option.

When it comes to educating the users on the downside of disabling logging,
IMHO I think a comment in the JavaDoc should be sufficient.

-Bill

On Fri, Apr 12, 2019 at 3:59 PM Bruno Cadonna  wrote:

> Matthias,
>
> Are you sure the users are aware that with `withLoggingDisabled()`, they
> might lose data during failover?
>
> OK, we maybe do not necessarily need a WARN log. However, I would at least
> add a comment like in `StoreBuilder`,ie,
>
> /**
> * Disable the changelog for store built by this {@link StoreBuilder}.
> * This will turn off fault-tolerance for your store.
> * By default the changelog is enabled.
> * @return this
> */
> StoreBuilder withLoggingDisabled();
>
> What do you think?
>
> Best,
> Bruno
>
> On Thu, Apr 11, 2019 at 12:04 AM Matthias J. Sax 
> wrote:
>
> > I think that the current proposal to add `withLoggingDisabled()` and
> > `withLoggingEnabled(Map)` should be the best option.
> >
> > IMHO there is no reason to add a WARN log. We also don't have a WARN log
> > when people disable logging on regular stores. As Bruno mentioned, this
> > might also lead to data loss, so I don't see why we should treat
> > suppress() different to other stores.
> >
> >
> > -Matthias
> >
> > On 4/10/19 3:36 PM, Bruno Cadonna wrote:
> > > Hi Marteen and John,
> > >
> > > I would opt for option 1 with an additional log message on INFO or WARN
> > > level, since the log file is the place where you would look first to
> > > understand what went wrong. I would also not adjust it when persistence
> > > stores are available for suppress.
> > >
> > > I would not go for option 2 or 3, because IIUC, with
> > > `withLoggingDisabled()` also persistent state stores do not guarantee
> not
> > > to loose records. Persisting state stores is basically a way to
> optimize
> > > recovery in certain cases. The changelog topic is the component that
> > > guarantees no data loss. So regarding data loss, in my opinion,
> disabling
> > > logging on the suppression buffer is not different from disabling
> logging
> > > on other state stores. Please correct me if I am wrong.
> > >
> > > Best,
> > > Bruno
> > >
> > > On Wed, Apr 10, 2019 at 12:12 PM John Roesler 
> wrote:
> > >
> > >> Thanks for the update and comments, Maarten. It would be interesting
> to
> > >> hear what others think as well.
> > >> -John
> > >>
> > >> On Thu, Apr 4, 2019 at 2:43 PM Maarten Duijn 
> > wrote:
> > >>
> > >>> Thank you for the explanation regarding the internals, I have edited
> > the
> > >>> KIP accordingly and updated the Javadoc. About the possible data loss
> > >> when
> > >>> altering changelog config, I think we can improve by doing (one of)
> the
> > >>> following.
> > >>>
> > >>> 1) Add a warning in the comments that clearly states what might
> happen
> > >>> when change logging is disabled and adjust it when persistent stores
> > are
> > >>> added.
> > >>>
> > >>> 2) Change `withLoggingDisabled` to `minimizeLogging`. Instead of
> > >> disabling
> > >>> logging, a call to this method minimizes the topic size by
> aggressively
> > >>> removing the records emitted downstream by the suppress operator. I
> > >> believe
> > >>> this can be achieved by setting `delete.retention.ms=0` in the topic
> > >>> config.
> > >>>
> > >>> 3) Remove `withLoggingDisabled` from the proposal.
> > >>>
> > >>> 4) Leave both methods as-proposed, as you indicated, this is in line
> > with
> > >>> the other parts of the Streams API
> > >>>
> > >>> A user might want to disable logging when downstream is not a Kafka
> > topic
> > >>> but some other service that does not benefit from
> atleast-once-delivery
> > >> of
> > >>> the suppressed records in case of failover or rebalance.
> > >>> Seeing as it might cause data loss, the methods should not be used
> > >> lightly
> > >>> and I think some comments are warranted. Personally, I rely purely on
> > >> Kafka
> > >>> to prevent data loss even when a store persisted locally, so when
> > support
> > >>> is added for persistent suppression, I feel the comments may stay.
> > >>>
> > >>> Maarten
> > >>>
> > >>
> > >
> >
> >
>


Re: [DISCUSS] KIP-449: Add connector contexts to Connect worker logs

2019-04-12 Thread Randall Hauch
Thanks for the review and feedback, Konstantine.

1. Great suggestion. I've updated the KIP to hopefully make it more clear
that the uncommented line is unchanged from the existing Log4J
configuration file.

2. Regarding including a `-` before the task number is acceptable if it
makes it easier to, read and filter. I've updated the KIP and PR to
incorporate your suggestion.

3. Task numbers do start at 0, as seen by the DistributedHerder code that
creates the tasks (
https://github.com/apache/kafka/blob/02221bd907a23041c95ce6446986bff631652b3a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L608-L611).
I've updated the KIP to highlight that task numbers are 0-based. As you can
see in the sample log included in the KIP, "task-0" corresponds to the
first task, not to the connector. In fact, the following are examples of
the log messages with the "worker" scope where the Connector implementation
is called:

[2019-04-02 17:01:38,315] INFO [local-file-source|worker] Creating
connector local-file-source of type FileStreamSource
(org.apache.kafka.connect.runtime.Worker:227)
[2019-04-02 17:01:38,317] INFO [local-file-source|worker] Instantiated
connector local-file-source with version 2.3.0-SNAPSHOT of type class
org.apache.kafka.connect.file.FileStreamSourceConnector
(org.apache.kafka.connect.runtime.Worker:230)
...

[2019-04-02 17:11:46,408] INFO [local-file-sink|worker] Stopping connector
local-file-sink (org.apache.kafka.connect.runtime.Worker:334)
[2019-04-02 17:11:46,408] INFO [local-file-sink|worker] Stopped connector
local-file-sink (org.apache.kafka.connect.runtime.Worker:350)


The FileStreamSourceConnector class does not actually log any messages
itself, but if it did  those would be with the "[local-file-source|worker]"
context and would include the name of the class and line number in
parentheses (instead of e.g.,
"org.apache.kafka.connect.runtime.Worker:230").

4. I thought about doing a more complex example, but IMO the extra log
lines made the *sample* in the KIP quite a bit longer harder to understand.
I thought it important to keep the KIP a bit more readable while showing
which scope appear for the different log messages. A second task would
essentially have the nearly the same log messages, just with a task number
in the scope.

5. The PR already changes the `runtime/src/test/resources/log4j.properties`
to include the connector context MDC parameter. Because that's not a public
API, I've not mentioned it in the KIP.

6. I guess I understand your goal - it's not always clear which worker is
being referenced. However, I'm not sure whether it's that valuable to
include the "client.id" (if set) just in the "worker" scope. That means
that it's maybe more useful to introduce a second MDC parameter (e.g., "%{
worker.id}") and optionally include that on all log messages. We'd have to
set the MDC context in the code in each thread, which isn't too much
effort. The other advantage of this approach is that it doesn't have to be
configurable: you can control it via your own logging configuration file
(rather than optionally including it in the "worker" scope on some of the
log messages). Thoughts? What would the "%{worker.id}" MDC value be set to
if "client.id" is not set?

Final notes: Removed the placeholders, and corrected the typos and grammar.

Thanks again for the detailed review!

On Fri, Apr 12, 2019 at 2:05 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Thanks for the KIP Randall.
>
> It might not be obvious right away, but this is a great improvement when
> running Connect with multiple connectors or when debugging Connect, for
> instance in integration tests or system tests. KIP looks good to me
> overall, I have just a few comments below:
>
> 1. In the snippet of the config, by including an uncommented line, maybe
> it's not immediately clear that this line is an existing line in
> connect-log4j.properties and not an addition. Should this be mentioned in a
> separate code block or in a different way?
>
> 2. Currently when adding the taskId to a connector or connector task, we
> precede it with a dash (-). I feel this addition would make it easier to
> parse the taskId visually as well as with parsing tools (for example I can
> use `-` as a delimiter easier than 'k' or even a multi-character delimiter
> such as 'task'). Do you think it's an overhead?
>
> 3. I think a specific mention to the convention used on the base-index for
> taskIds would be useful. For example, taskId equal to zero in most cases
> represents the (source/sink) connector, while taskId > 0 corresponds to
> (source/sink) tasks. By reading the KIP, I assume that task0 (or task-0 if
> you agree with my previous comment) will be the connector. Is this the
> case?
>
> 4. Should the example include a snippet for taskId > 0?
>
> 5. Do you intend to enable connector MDC in tests? In this case how would
> you distinguish between multiple workers in integrations tests? 

Re: [VOTE] KIP-411: Make default Kafka Connect worker task client IDs distinct

2019-04-12 Thread Ryanne Dolan
+1 (non binding)

Thanks
Ryanne

On Fri, Apr 12, 2019, 11:11 AM Paul Davidson
 wrote:

> Just a reminder that KIP-411 is open for voting. No votes received yet!
>
> On Fri, Apr 5, 2019 at 9:07 AM Paul Davidson 
> wrote:
>
> > Hi all,
> >
> > Since we seem to have agreement in the discussion I would like to start
> > the vote on KIP-411.
> >
> > See:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct
> >
> > Also see the related PR: https://github.com/apache/kafka/pull/6097
> >
> > Thanks to everyone who contributed!
> >
> > Paul
> >
>


Re: [VOTE] KIP-445: In-memory session store

2019-04-12 Thread Sophie Blee-Goldman
Hi all,

This KIP passes with three +1 (binding) -- thanks Matthias, Bill, and
Guozhang!

For those interested, feel free to check out and join the ongoing PR review
here: https://github.com/apache/kafka/pull/6525

Cheers,
Sophie

On Wed, Apr 10, 2019 at 1:21 PM Guozhang Wang  wrote:

> +1 (binding).
>
> On Tue, Apr 9, 2019 at 5:46 PM Bill Bejeck  wrote:
>
> > Thanks for the KIP Sophie.
> >
> > +1(binding)
> >
> > -Bill
> >
> > On Tue, Apr 9, 2019 at 12:14 AM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the KIP Sophie!
> > >
> > > +1 (binding)
> > >
> > >
> > > -Matthias
> > >
> > > On 4/8/19 5:26 PM, Sophie Blee-Goldman wrote:
> > > > Hello all,
> > > >
> > > > There has been a positive reception so I'd like to call for a vote on
> > > > KIP-445, augmenting our session store options with an in-memory
> > version.
> > > > This would round out our store API to offer in-memory and persistent
> > > > versions of all three types of stores.
> > > >
> > > > KIP:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-445%3A+In-memory+Session+Store
> > > > PR: https://github.com/apache/kafka/pull/6525
> > > > JIRA: https://issues.apache.org/jira/browse/KAFKA-8029
> > > >
> > > > This would also open up the possibility of migrating some of the
> > > > unit/integration tests to in-memory stores to speed things up a bit
> ;)
> > > >
> > > > Cheers,
> > > > Sophie
> > > >
> > >
> > >
> >
>
>
> --
> -- Guozhang
>


[DISCUSS] 2.3.0 release

2019-04-12 Thread Colin McCabe
Hi all,

I'd like to volunteer to be the release manager for the our next feature 
release, the 2.3 release.

If that sounds good, I'll post a release plan on Monday.

cheers,
Colin


Re: [DISCUSS] 2.3.0 release

2019-04-12 Thread Ismael Juma
Thanks for volunteering Colin, +1.

Ismael

On Sat, Apr 13, 2019 at 2:05 AM Colin McCabe  wrote:

> Hi all,
>
> I'd like to volunteer to be the release manager for the our next feature
> release, the 2.3 release.
>
> If that sounds good, I'll post a release plan on Monday.
>
> cheers,
> Colin
>


Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams

2019-04-12 Thread Jason Gustafson
Hi Guozhang,

Responses below:

2. The interface's default implementation will just be
> `onPartitionRevoked`, so for user's instantiation if they do not make any
> code changes they should be able to recompile the code and continue.


Ack, makes sense.

4. Hmm.. not sure if it will work. The main issue is that the
> consumer-coordinator behavior (whether to revoke all or none at
> onRebalancePrepare) is independent of the selected protocol's assignor
> (eager or cooperative), so even if the assignor is selected to be the
> old-versioned one, we will still not revoke at the consumer-coordinator
> layer and hence has the same risk of migrating still-owned partitions,
> right?


Yeah, basically we would have to push the eager/cooperative logic into the
PartitionAssignor itself and make the consumer aware of the rebalance
protocol it is compatible with. As long as an eager protocol _could_ be
selected, the consumer would have to be pessimistic and do eager
revocation. But if all the assignors configured in the consumer support
cooperative reassignment, then either 1) a cooperative protocol will be
selected and cooperative revocation can be safely used, or 2) if the rest
of the group does not support it, then the consumer will simply fail.

Another point which you raised offline and I will repeat here is that this
proposal's benefit is mostly limited to sticky assignment logic. Arguably
the range assignor may have some incidental stickiness, particularly if the
group is rebalancing for a newly created or deleted topic. For other cases,
the proposal is mostly additional overhead since it takes an additional
rebalance and many of the partitions will move. Perhaps it doesn't make as
much sense to use the cooperative protocol for strategies like range and
round-robin. That kind of argues in favor of pushing some of the control
into the assignor itself. Maybe we would not bother creating
CooperativeRange as I suggested above, but it would make sense to create a
cooperative version of the sticky assignment strategy. I thought we might
have to create a new sticky assignor anyway because I can't see how we
would get compatible behavior mixing with the old version anyway.

Thanks,
Jason


On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang  wrote:

> Hello Matthias:
>
> Thanks for your review.
>
> The background section uses streams assignor as well as the consumer's own
> stick assignor as examples illustrating the situation, but this KIP is for
> consumer coordinator itself, and the rest of the paragraph did not talk
> about Streams any more. If you feel it's a bit distracted I can remove
> those examples.
>
> 10). While working on the PR I realized that the revoked partitions on
> assignment is not needed (this is being discussed on the PR itself:
> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
>
> 20). 1.a. Good question, I've updated the wiki to let the consumer's
> cleanup assignment and re-join, and not letting assignor making any
> proactive changes. The idea is to keep logic simpler and not doing any
> "split brain" stuff.
>
> 20). 2.b. No we do not need, since the owned-partitions will be part of the
> Subscription passed in to assign() already.
>
> 30). As Boyang mentioned, there are some drawbacks that can not be
> addressed by rebalance delay still, hence still voted KIP-345 (some more
> details can be found on the discussion thread of KIP-345 itself). One
> example is that as the instance resumes, its member id will be empty so we
> are still relying on assignor to give it the assignment from the old
> member-id while keeping all other member's assignment unchanged.
>
> 40). Incomplete sentence, I've updated it.
>
> 50). Here's my idea: suppose we augment the join group schema with
> `protocol version` in 2.3, and then with both brokers and clients being in
> version 2.3+, on the first rolling bounce where subscription and assignment
> schema and / or user metadata has changed, this protocol version will be
> bumped. On the broker side, when receiving all member's join-group request,
> it will choose the one that has the highest protocol version (also it
> assumes higher versioned protocol is always backward compatible, i.e. the
> coordinator can recognize lower versioned protocol as well) and select it
> as the leader. Then the leader can decide, based on its received and
> deserialized subscription information, how to assign partitions and how to
> encode the assignment accordingly so that everyone can understand it. With
> this, in Streams for example, no version probing would be needed since we
> are guaranteed the leader knows everyone's version -- again it is assuming
> that higher versioned protocol is always backward compatible -- and hence
> can successfully do the assignment at that round.
>
> 60). My bad, this section was not updated while the design was evolved,
> I've updated it.
>
>
> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen  wrote:
>
> >
> > Thanks for the review Matthias! My

Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-12 Thread Harsha
Hi,
  
"Relying on min.version seems like a pretty clunky way to achieve the above
> list. The challenge is that it's pretty difficult to do it in a way that
> works for clients across languages. They each add support for new protocol
> versions independently (it could even happen in a bug fix release). So, if
> you tried to block Sarama in #2, you may block Java clients too."

That's the intended effect, right?  if you as the admin/operator configures the 
broker to have min.api.version to be 1.1 
it should block java , sarama clients etc.. which are below the 1.1 protocol.  
As mentioned this is not just related to log.format upgrade problem but in 
general a forcing cause to get the users to upgrade their client version in a 
multi-tenant environment.

"> For #3, it seems simplest to have a config that requires clients to support
> a given message format version (or higher). For #2, it seems like you'd
> want clients to advertise their versions. That would be useful for multiple
> reasons."
This kip offers the ability to block clients based on the protocol they 
support. This should be independent of the message format upgrade. Not all of 
the features or bugs are dependent on a message format and having a message 
format dependency to block clients means we have to upgrade to message.format 
and we cannot just say we've 1.1 brokers with 0.8.2 message format and now we 
want to block all 0.8.x clients.

min.api.version helps at the cluster level to say that all users required to 
upgrade clients to the at minimum need to speak the min.api.version and not tie 
to message.format because not all cases one wants to upgrade the message format 
and block the old clients.


To Gwen's point, I think we should also return in the error message that the 
broker only supports min.api.version and above. So that users can see a clear 
message and upgrade to a newer version.


Thanks,
Harsha


On Fri, Apr 12, 2019, at 12:19 PM, Ismael Juma wrote:
> Hi Ying,
> 
> The actual reasons are important so that people can evaluate the KIP (and
> vote). :) Thanks for providing a few more:
> 
> (1) force users to check pointing in Kafka instead of zookeeper
> (2) forbid an old go (sarama) client library which is known to have some
> serious bugs
> (3) force kafka 1.x clients with the ability to roll back if there's an
> issue (unlike a message format upgrade)
> 
> Relying on min.version seems like a pretty clunky way to achieve the above
> list. The challenge is that it's pretty difficult to do it in a way that
> works for clients across languages. They each add support for new protocol
> versions independently (it could even happen in a bug fix release). So, if
> you tried to block Sarama in #2, you may block Java clients too.
> 
> For #3, it seems simplest to have a config that requires clients to support
> a given message format version (or higher). For #2, it seems like you'd
> want clients to advertise their versions. That would be useful for multiple
> reasons.
> 
> Ismael
> 
> On Fri, Apr 12, 2019 at 8:42 PM Ying Zheng  wrote:
> 
> > Hi Ismael,
> >
> > Those are just examples. I think the administrators should be able to block
> > certain client libraries for whatever reason. Some other possible reasons
> > include, force users to check pointing in Kafka instead of zookeeper,
> > forbid an old go (sarama) client library which is known to have some
> > serious bugs.
> >
> > message.downconversion.enable does not solve our problems. We are now
> > planning to upgrade to message format V3, and force users to upgrade to
> > Kafka 1.x clients. With the proposed min.api.version setting, in case of
> > there is anything wrong, we can roll back the setting. If we upgrade the
> > file format, there is no way to rollback (Kafka doesn't support downgrading
> > message format).
> >
> > On Thu, Apr 11, 2019 at 7:05 PM Ismael Juma  wrote:
> >
> > > Hi Ying,
> > >
> > > It looks to me that all the examples given in the KIP can be handled with
> > > the existing "message.downconversion.enable" config and by configuring
> > the
> > > message format to be the latest:
> > >
> > > 1. Kafka 8 / 9 / 10 consumer hangs when the message contains message
> > header
> > > > ( KAFKA-6739 - Down-conversion fails for records with headers
> > RESOLVED  )
> > > > 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 ( KAFKA-3160 -
> > > > Kafka LZ4 framing code miscalculates header checksum RESOLVED  )
> > > > 3. Performance penalty of converting message format from V3 to V1 or V2
> > > > for the old consumers (KIP-31 - Move to relative offsets in compressed
> > > > message sets)
> > >
> > >
> > > Am I missing something? Are there other examples that are not related to
> > > message conversion?
> > >
> > > Ismael
> > >
> > > On Thu, Apr 11, 2019 at 11:53 PM Ying Zheng 
> > > wrote:
> > >
> > > > Hi here,
> > > >
> > > > Please vote for
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+broker

Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-12 Thread Ismael Juma
Hi Harsha,

There is no such thing as 1.1 protocol. I encourage you to describe an
example config that achieves what you are suggesting here. It's pretty
complicated because the versions are per API and each client evolves
independently.

Ismael

On Sat, Apr 13, 2019 at 4:09 AM Harsha  wrote:

> Hi,
>
> "Relying on min.version seems like a pretty clunky way to achieve the above
> > list. The challenge is that it's pretty difficult to do it in a way that
> > works for clients across languages. They each add support for new
> protocol
> > versions independently (it could even happen in a bug fix release). So,
> if
> > you tried to block Sarama in #2, you may block Java clients too."
>
> That's the intended effect, right?  if you as the admin/operator
> configures the broker to have min.api.version to be 1.1
> it should block java , sarama clients etc.. which are below the 1.1
> protocol.  As mentioned this is not just related to log.format upgrade
> problem but in general a forcing cause to get the users to upgrade their
> client version in a multi-tenant environment.
>
> "> For #3, it seems simplest to have a config that requires clients to
> support
> > a given message format version (or higher). For #2, it seems like you'd
> > want clients to advertise their versions. That would be useful for
> multiple
> > reasons."
> This kip offers the ability to block clients based on the protocol they
> support. This should be independent of the message format upgrade. Not all
> of the features or bugs are dependent on a message format and having a
> message format dependency to block clients means we have to upgrade to
> message.format and we cannot just say we've 1.1 brokers with 0.8.2 message
> format and now we want to block all 0.8.x clients.
>
> min.api.version helps at the cluster level to say that all users required
> to upgrade clients to the at minimum need to speak the min.api.version and
> not tie to message.format because not all cases one wants to upgrade the
> message format and block the old clients.
>
>
> To Gwen's point, I think we should also return in the error message that
> the broker only supports min.api.version and above. So that users can see a
> clear message and upgrade to a newer version.
>
>
> Thanks,
> Harsha
>
>
> On Fri, Apr 12, 2019, at 12:19 PM, Ismael Juma wrote:
> > Hi Ying,
> >
> > The actual reasons are important so that people can evaluate the KIP (and
> > vote). :) Thanks for providing a few more:
> >
> > (1) force users to check pointing in Kafka instead of zookeeper
> > (2) forbid an old go (sarama) client library which is known to have some
> > serious bugs
> > (3) force kafka 1.x clients with the ability to roll back if there's an
> > issue (unlike a message format upgrade)
> >
> > Relying on min.version seems like a pretty clunky way to achieve the
> above
> > list. The challenge is that it's pretty difficult to do it in a way that
> > works for clients across languages. They each add support for new
> protocol
> > versions independently (it could even happen in a bug fix release). So,
> if
> > you tried to block Sarama in #2, you may block Java clients too.
> >
> > For #3, it seems simplest to have a config that requires clients to
> support
> > a given message format version (or higher). For #2, it seems like you'd
> > want clients to advertise their versions. That would be useful for
> multiple
> > reasons.
> >
> > Ismael
> >
> > On Fri, Apr 12, 2019 at 8:42 PM Ying Zheng 
> wrote:
> >
> > > Hi Ismael,
> > >
> > > Those are just examples. I think the administrators should be able to
> block
> > > certain client libraries for whatever reason. Some other possible
> reasons
> > > include, force users to check pointing in Kafka instead of zookeeper,
> > > forbid an old go (sarama) client library which is known to have some
> > > serious bugs.
> > >
> > > message.downconversion.enable does not solve our problems. We are now
> > > planning to upgrade to message format V3, and force users to upgrade to
> > > Kafka 1.x clients. With the proposed min.api.version setting, in case
> of
> > > there is anything wrong, we can roll back the setting. If we upgrade
> the
> > > file format, there is no way to rollback (Kafka doesn't support
> downgrading
> > > message format).
> > >
> > > On Thu, Apr 11, 2019 at 7:05 PM Ismael Juma  wrote:
> > >
> > > > Hi Ying,
> > > >
> > > > It looks to me that all the examples given in the KIP can be handled
> with
> > > > the existing "message.downconversion.enable" config and by
> configuring
> > > the
> > > > message format to be the latest:
> > > >
> > > > 1. Kafka 8 / 9 / 10 consumer hangs when the message contains message
> > > header
> > > > > ( KAFKA-6739 - Down-conversion fails for records with headers
> > > RESOLVED  )
> > > > > 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 (
> KAFKA-3160 -
> > > > > Kafka LZ4 framing code miscalculates header checksum RESOLVED  )
> > > > > 3. Performance penalty of converting message format from 

Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-12 Thread Harsha
Hi Ismael,
I meant to say blocking clients based on their API version 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/ApiVersion.scala#L48
 
But If I understand what you are saying, since each client release can support 
different versions for each of fetch, produce, offset commit etc.. and it's 
harder to block just based on single min.api.version setting across different 
clients. 
The idea I had in my mind was to do this via ApiVersionRequest, when a client 
makes api request to broker in response we return min and max version supported 
for each Api. When min.api.version enabled on broker, it returns the maxVersion 
it supports for each of the requests in that release as min versions to the 
clients.

Example:
Kafka 1.1.1 broker and min.api.verson set to 
https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/api/ApiVersion.scala#L79
 (KAFKA_1_1_IV0) and client makes a ApiVersionsRequest and in response for 
example produce request 
https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L112
Instead of returning all of the supported versions it will return 
PRODUCE_REQUEST_V5 as the only supported version.

Irrespective of the above approach I understand your point still stands which 
is sarama might not choose to implement all the higher version protocols for 
Kafka 1.1 release and they might introduce higher version of produce request in 
a subsequent minor release and it will be harder for users to figure out which 
release of sarama client they can use.


Ying, if you have a different apporach which might address this issue please 
add.


Thanks,
Harsha

On Fri, Apr 12, 2019, at 7:23 PM, Ismael Juma wrote:
> Hi Harsha,
> 
> There is no such thing as 1.1 protocol. I encourage you to describe an
> example config that achieves what you are suggesting here. It's pretty
> complicated because the versions are per API and each client evolves
> independently.
> 
> Ismael
> 
> On Sat, Apr 13, 2019 at 4:09 AM Harsha  wrote:
> 
> > Hi,
> >
> > "Relying on min.version seems like a pretty clunky way to achieve the above
> > > list. The challenge is that it's pretty difficult to do it in a way that
> > > works for clients across languages. They each add support for new
> > protocol
> > > versions independently (it could even happen in a bug fix release). So,
> > if
> > > you tried to block Sarama in #2, you may block Java clients too."
> >
> > That's the intended effect, right?  if you as the admin/operator
> > configures the broker to have min.api.version to be 1.1
> > it should block java , sarama clients etc.. which are below the 1.1
> > protocol.  As mentioned this is not just related to log.format upgrade
> > problem but in general a forcing cause to get the users to upgrade their
> > client version in a multi-tenant environment.
> >
> > "> For #3, it seems simplest to have a config that requires clients to
> > support
> > > a given message format version (or higher). For #2, it seems like you'd
> > > want clients to advertise their versions. That would be useful for
> > multiple
> > > reasons."
> > This kip offers the ability to block clients based on the protocol they
> > support. This should be independent of the message format upgrade. Not all
> > of the features or bugs are dependent on a message format and having a
> > message format dependency to block clients means we have to upgrade to
> > message.format and we cannot just say we've 1.1 brokers with 0.8.2 message
> > format and now we want to block all 0.8.x clients.
> >
> > min.api.version helps at the cluster level to say that all users required
> > to upgrade clients to the at minimum need to speak the min.api.version and
> > not tie to message.format because not all cases one wants to upgrade the
> > message format and block the old clients.
> >
> >
> > To Gwen's point, I think we should also return in the error message that
> > the broker only supports min.api.version and above. So that users can see a
> > clear message and upgrade to a newer version.
> >
> >
> > Thanks,
> > Harsha
> >
> >
> > On Fri, Apr 12, 2019, at 12:19 PM, Ismael Juma wrote:
> > > Hi Ying,
> > >
> > > The actual reasons are important so that people can evaluate the KIP (and
> > > vote). :) Thanks for providing a few more:
> > >
> > > (1) force users to check pointing in Kafka instead of zookeeper
> > > (2) forbid an old go (sarama) client library which is known to have some
> > > serious bugs
> > > (3) force kafka 1.x clients with the ability to roll back if there's an
> > > issue (unlike a message format upgrade)
> > >
> > > Relying on min.version seems like a pretty clunky way to achieve the
> > above
> > > list. The challenge is that it's pretty difficult to do it in a way that
> > > works for clients across languages. They each add support for new
> > protocol
> > > versions independently (it could even happen in a bug fix release). So,
> > if
> > > you tried