Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-05-13 Thread David Jacot
Hi Jun,

Coming back to your question regarding the differences between the token
bucket algorithm and our current quota mechanism. I did some tests and
they confirmed my first intuition that our current mechanism does not work
well with a bursty workload. Let me try to illustrate the difference with an
example. One important aspect to keep in mind is that we don't want to
reject requests when the quota is exhausted.

Let's say that we want to guarantee an average rate R=5 partitions/sec while
allowing a burst B=500 partitions.

With our current mechanism, this translates to following parameters:
- Quota = 5
- Samples = B / R + 1 = 101 (to allow the burst)
- Time Window = 1s (the default)

Now, let's say that a client wants to create 7 topics with 80 partitions
each at
the time T. It brings the rate to 5.6 (7 * 80 / 100) which is above the
quota so
any new request is rejected until the quota gets back above R. In theory,
the
client must wait 12 secs ((5.6 - 5) / 5 * 100) to get it back to R. In
practice, due
to the sparse samples (one sample worth 560), the rate won't decrease until
that sample is dropped and it will be after 101 secs. It gets worse if the
burst
is increased.

With the token bucket algorithm, this translate to the following parameters:
- Rate = 5
- Tokens = 500

The same request decreases the number of available tokens to -60 which is
below 0 so any new request is rejected until the number of available tokens
gets back above 0. This takes 12 secs ((60+1) / 5).

The token bucket algorithm is more suited for bursty workloads which is our
case here. I hope that this example helps to clarify the choice.

Best,
David

On Tue, May 12, 2020 at 3:19 PM Tom Bentley  wrote:

> Hi David,
>
> Thanks for the reply.
>
> >> If I understand the proposed throttling algorithm, an initial request
> > would
> > >> be allowed (possibly making K negative) and only subsequent requests
> > >> (before K became positive) would receive the QUOTA_VIOLATED. That
> would
> > >> mean it was still possible to block the controller from handling other
> > >> events – you just need to do so via making one big request.
> >
> > That is correct. One could still create one big topic (not request) and
> > that would
> > create some load on the controller. All options suffer from this issue as
> > we can
> > stop clients from creating a very large topic. At least, when it happens,
> > the client
> > will have to wait to pay back its credits which guarantee that we control
> > the average
> > load on the controller.
> >
>
> I can see that the admission throttling is better than nothing. It's just
> that it doesn't fully solve the problem described in the KIP's motivation.
> That doesn't mean it's not worth doing. I certainly prefer this approach
> over that taken by KIP-578 (which cites the effects of deleting a single
> topic with many partitions).
>
>
> > >> While the reasons for rejecting execution throttling make sense given
> > the
> > >> RPCs we have today that seems to be at the cost of still allowing harm
> > to
> > >> the cluster, or did I misunderstand?
> >
> > Execution throttling would also suffer from large topics being created.
> We
> > have
> > rejected it due to the current RPCs and also because it does not limit
> the
> > amount
> > of work queued up in the controller. Imagine a low quota, that would
> result
> > in a huge
> > backlog of pending operations.
> >
>
> What exactly is the problem with having a huge backlog of pending
> operations? I can see that the backlog would need persisting so that the
> controller could change without losing track of the topics to be mutated,
> and the mutations would need to be submitted in batches to the controller
> event queue (thus allowing other controller requests to be interleaved).  I
> realise this is not feasible right now, I'm just trying to understand if
> it's feasible at all and if there's any appetite for making the requisite
> API changes in the future in order to prevent these problems even for large
> single requests.
>
> Kind regards,
>
> Tom
>


[jira] [Created] (KAFKA-9986) Checkpointing API for State Stores

2020-05-13 Thread Nikolay Izhikov (Jira)
Nikolay Izhikov created KAFKA-9986:
--

 Summary: Checkpointing API for State Stores
 Key: KAFKA-9986
 URL: https://issues.apache.org/jira/browse/KAFKA-9986
 Project: Kafka
  Issue Type: New Feature
Reporter: Nikolay Izhikov


The parent ticket is KAFKA-3184.

The goal of this ticket is to provide a general checkpointing API for state 
stores in Streams (not only for in-memory but also for persistent stores), 
where the checkpoint location can be either local disks or remote storage. 

Design scope is primarily on:

  # the API design for both checkpointing as well as loading checkpoints into 
the local state stores
  # the mechanism of the checkpointing, e.g. whether it should be async? 
whether it should be executed on separate threads? etc. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-05-13 Thread David Jacot
Hi Tom,

>> What exactly is the problem with having a huge backlog of pending
>> operations? I can see that the backlog would need persisting so that the
>> controller could change without losing track of the topics to be mutated,
>> and the mutations would need to be submitted in batches to the controller
>> event queue (thus allowing other controller requests to be
interleaved).  I
>> realise this is not feasible right now, I'm just trying to understand if
>> it's feasible at all and if there's any appetite for making the requisite
>> API changes in the future in order to prevent these problems even for
large
>> single requests.

It is definitely feasible. My concern with the approach is about the way our
current API works. Let me try to illustrate it with an example.

When the admin client sends a CreateTopicsRequest to the controller, the
request goes to the purgatory waiting until all the topics are created or
the
timeout specified in the request is reached. If the timeout is reached, a
RequestTimeoutException is returned to the client. This is used to fail the
future of the caller. In conjunction, the admin client fails any pending
request
with a TimeoutException after the request timeout is reached (30s by
default).
In the former case, the caller will likely retry. In the later case, the
admin client
will automatically retry. In both cases, the broker will respond with a
TopicExistsException.

Having a huge backlog of pending operations will amplify this weird
behavior.
Clients will tend to get TopicExistsException errors when they create
topics for
the first time which is really weird.

I think that our current API is not well suited for this. An asynchronous
API workflow
with one API to create/delete and another one to query the status of
completion would
be better suited. We can definitely involve our API towards this but we
need to
figure out a compatibility story for existing clients.

Another aspect is the fairness among the clients. Imagine a case where one
client
continuously creates and deletes topics in a tight loop. This would
flood the queue
and delay the creations and the deletions of the other clients. Throttling
at admission
time mitigates this directly. Throttling at execution would need to take
this into account
to ensure fairness among the clients. It is a little harder to do this in
the controller as
the controller is completely agnostic from the principals and the client
ids.

These reasons made me lean towards the current proposal. Does that make
sense?

Best,
David



On Wed, May 13, 2020 at 10:05 AM David Jacot  wrote:

> Hi Jun,
>
> Coming back to your question regarding the differences between the token
> bucket algorithm and our current quota mechanism. I did some tests and
> they confirmed my first intuition that our current mechanism does not work
> well with a bursty workload. Let me try to illustrate the difference with
> an
> example. One important aspect to keep in mind is that we don't want to
> reject requests when the quota is exhausted.
>
> Let's say that we want to guarantee an average rate R=5 partitions/sec
> while
> allowing a burst B=500 partitions.
>
> With our current mechanism, this translates to following parameters:
> - Quota = 5
> - Samples = B / R + 1 = 101 (to allow the burst)
> - Time Window = 1s (the default)
>
> Now, let's say that a client wants to create 7 topics with 80 partitions
> each at
> the time T. It brings the rate to 5.6 (7 * 80 / 100) which is above the
> quota so
> any new request is rejected until the quota gets back above R. In theory,
> the
> client must wait 12 secs ((5.6 - 5) / 5 * 100) to get it back to R. In
> practice, due
> to the sparse samples (one sample worth 560), the rate won't decrease until
> that sample is dropped and it will be after 101 secs. It gets worse if the
> burst
> is increased.
>
> With the token bucket algorithm, this translate to the following
> parameters:
> - Rate = 5
> - Tokens = 500
>
> The same request decreases the number of available tokens to -60 which is
> below 0 so any new request is rejected until the number of available tokens
> gets back above 0. This takes 12 secs ((60+1) / 5).
>
> The token bucket algorithm is more suited for bursty workloads which is our
> case here. I hope that this example helps to clarify the choice.
>
> Best,
> David
>
> On Tue, May 12, 2020 at 3:19 PM Tom Bentley  wrote:
>
>> Hi David,
>>
>> Thanks for the reply.
>>
>> >> If I understand the proposed throttling algorithm, an initial request
>> > would
>> > >> be allowed (possibly making K negative) and only subsequent requests
>> > >> (before K became positive) would receive the QUOTA_VIOLATED. That
>> would
>> > >> mean it was still possible to block the controller from handling
>> other
>> > >> events – you just need to do so via making one big request.
>> >
>> > That is correct. One could still create one big topic (not request) and
>> > that would
>> > create some load on the controller. All options suffer from 

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-05-13 Thread Tom Bentley
Hi David,

Thanks for the explanation and confirmation that evolving the APIs is not
off the table in the longer term.

Kind regards,

Tom


[VOTE] KIP 585: Filter and conditional SMTs

2020-05-13 Thread Tom Bentley
Hi,

I'd like to start a vote on KIP-585: Filter and conditional SMTs

https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs

Those involved in the discussion seem to be positively disposed to the
idea, but in the absence of any committer participation it's been difficult
to find a consensus on how these things should be configured. What's
presented here seemed to be the option which people preferred overall.

Kind regards,

Tom


Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-05-13 Thread Bruno Cadonna
Hi Sophie,

Thank you for your feedback!

Please find my comments inline.

On Mon, May 11, 2020 at 9:29 PM Sophie Blee-Goldman  wrote:
>
> Hey Bruno,
>
> Thanks for the KIP! I have one high-level concern, which is that we should
> consider
> reporting these metrics on the per-store level rather than instance-wide. I
> know I was
> the one who first proposed making it instance-wide, so bear with me:
>
> While I would still argue that the instance-wide memory usage is probably
> the most *useful*,
> exposing them at the store-level does not prevent users from monitoring the
> instance-wide
> memory. They should be able to roll up all the store-level metrics on an
> instance to
> compute the total off-heap memory. But rolling it up for the users does
> prevent them from
> using this to debug rare cases where one store may be using significantly
> more memory than
> expected.
>

Having the metrics on store-level makes sense to me. I also agree that
users can make the roll-up themselves.

> It's also worth considering that some users may be using the bounded memory
> config setter
> to put a cap on the off-heap memory of the entire process, in which case
> the memory usage
> metric for any one store should reflect the memory usage of the entire
> instance. In that case
> any effort to roll up the memory usages ourselves would just be wasted.
>

We would not roll-up the metrics by ourselves but use the utilities of
RocksDB. I guess those utilities would do the roll-up correctly and
without wasting anything also in the case of the bounded memory config
setter. Of course we would need to verify those utilities.

> Sorry for the reversal, but after a second thought I'm pretty strongly in
> favor of reporting these
> at the store level.
>
> Best,
> Sophie
>
> On Wed, May 6, 2020 at 8:41 AM Bruno Cadonna  wrote:
>
> > Hi all,
> >
> > I'd like to discuss KIP-607 that aims to add RocksDB memory usage
> > metrics to Kafka Streams.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Record+the+Memory+Used+by+RocksDB+to+Kafka+Streams
> >
> > Best,
> > Bruno
> >


Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-05-13 Thread Bruno Cadonna
Hi Guozhang,

The recording level is set on sensor level. At the moment, I do not
see any reason why we could not set store-level metrics to INFO. So I
would propose putting the metrics on store-level and setting the
recording level to INFO in the KIP. If it does not work due to
technical issues, we can re-discuss it. WDYT?

Best,
Bruno




On Tue, May 12, 2020 at 12:33 AM Guozhang Wang  wrote:
>
> Hello Sophie / Bruno,
>
> I've also thought about the leveling question, and one motivation I had for
> setting it in instance-level is that we want to expose it in INFO level:
> today our report leveling is not very finer grained --- which I think is
> sth. worth itself --- such that one have to either turn on all DEBUG
> metrics recording or none of them. If we can allow users to e.g. specify
> "turn on streams-metrics and stream-state-metrics, but not others" and then
> I think it should be just at store-level. However, right now if we want to
> set it as store-level then it would be DEBUG and not exposed by default.
>
> So it seems we have several options in addition to the proposed one:
>
> a) we set it at store-level as INFO; but then one can argue why this is
> INFO while others (bytes-written, etc) are DEBUG.
> b) we set it at store-level as DEBUG, believing that we do not usually need
> to turn it on.
> c) maybe, we can set it at task-level (? I'm not so sure myself about
> this.. :P) as INFO.
>
>
> Guozhang
>
>
>
>
> On Mon, May 11, 2020 at 12:29 PM Sophie Blee-Goldman 
> wrote:
>
> > Hey Bruno,
> >
> > Thanks for the KIP! I have one high-level concern, which is that we should
> > consider
> > reporting these metrics on the per-store level rather than instance-wide. I
> > know I was
> > the one who first proposed making it instance-wide, so bear with me:
> >
> > While I would still argue that the instance-wide memory usage is probably
> > the most *useful*,
> > exposing them at the store-level does not prevent users from monitoring the
> > instance-wide
> > memory. They should be able to roll up all the store-level metrics on an
> > instance to
> > compute the total off-heap memory. But rolling it up for the users does
> > prevent them from
> > using this to debug rare cases where one store may be using significantly
> > more memory than
> > expected.
> >
> > It's also worth considering that some users may be using the bounded memory
> > config setter
> > to put a cap on the off-heap memory of the entire process, in which case
> > the memory usage
> > metric for any one store should reflect the memory usage of the entire
> > instance. In that case
> > any effort to roll up the memory usages ourselves would just be wasted.
> >
> > Sorry for the reversal, but after a second thought I'm pretty strongly in
> > favor of reporting these
> > at the store level.
> >
> > Best,
> > Sophie
> >
> > On Wed, May 6, 2020 at 8:41 AM Bruno Cadonna  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to discuss KIP-607 that aims to add RocksDB memory usage
> > > metrics to Kafka Streams.
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Record+the+Memory+Used+by+RocksDB+to+Kafka+Streams
> > >
> > > Best,
> > > Bruno
> > >
> >
>
>
> --
> -- Guozhang


Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-05-13 Thread Bruno Cadonna
Hi John,

Thank you for the feedback!

I agree and I will change the KIP as I stated in my previous e-mail to Guozhang.

Best,
Bruno

On Tue, May 12, 2020 at 3:07 AM John Roesler  wrote:
>
> Thanks, all.
>
> If you don’t mind, I’ll pitch in a few cents’ worth.
>
> In my life I’ve generally found more granular metrics to be more useful, as 
> long as there’s a sane way to roll them up. It does seem nice to see it on 
> the per-store level. For roll-up purposes, the task and thread tags should be 
> sufficient.
>
> I think the only reason we make some metrics Debug is that _recording_ them 
> can be expensive. If there’s no added expense, I think we can just register 
> store-level metrics at Info level.
>
> Thanks for the KIP, Bruno!
> -John
>
> On Mon, May 11, 2020, at 17:32, Guozhang Wang wrote:
> > Hello Sophie / Bruno,
> >
> > I've also thought about the leveling question, and one motivation I had for
> > setting it in instance-level is that we want to expose it in INFO level:
> > today our report leveling is not very finer grained --- which I think is
> > sth. worth itself --- such that one have to either turn on all DEBUG
> > metrics recording or none of them. If we can allow users to e.g. specify
> > "turn on streams-metrics and stream-state-metrics, but not others" and then
> > I think it should be just at store-level. However, right now if we want to
> > set it as store-level then it would be DEBUG and not exposed by default.
> >
> > So it seems we have several options in addition to the proposed one:
> >
> > a) we set it at store-level as INFO; but then one can argue why this is
> > INFO while others (bytes-written, etc) are DEBUG.
> > b) we set it at store-level as DEBUG, believing that we do not usually need
> > to turn it on.
> > c) maybe, we can set it at task-level (? I'm not so sure myself about
> > this.. :P) as INFO.
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Mon, May 11, 2020 at 12:29 PM Sophie Blee-Goldman 
> > wrote:
> >
> > > Hey Bruno,
> > >
> > > Thanks for the KIP! I have one high-level concern, which is that we should
> > > consider
> > > reporting these metrics on the per-store level rather than instance-wide. 
> > > I
> > > know I was
> > > the one who first proposed making it instance-wide, so bear with me:
> > >
> > > While I would still argue that the instance-wide memory usage is probably
> > > the most *useful*,
> > > exposing them at the store-level does not prevent users from monitoring 
> > > the
> > > instance-wide
> > > memory. They should be able to roll up all the store-level metrics on an
> > > instance to
> > > compute the total off-heap memory. But rolling it up for the users does
> > > prevent them from
> > > using this to debug rare cases where one store may be using significantly
> > > more memory than
> > > expected.
> > >
> > > It's also worth considering that some users may be using the bounded 
> > > memory
> > > config setter
> > > to put a cap on the off-heap memory of the entire process, in which case
> > > the memory usage
> > > metric for any one store should reflect the memory usage of the entire
> > > instance. In that case
> > > any effort to roll up the memory usages ourselves would just be wasted.
> > >
> > > Sorry for the reversal, but after a second thought I'm pretty strongly in
> > > favor of reporting these
> > > at the store level.
> > >
> > > Best,
> > > Sophie
> > >
> > > On Wed, May 6, 2020 at 8:41 AM Bruno Cadonna  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to discuss KIP-607 that aims to add RocksDB memory usage
> > > > metrics to Kafka Streams.
> > > >
> > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Record+the+Memory+Used+by+RocksDB+to+Kafka+Streams
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >


Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-13 Thread Rajini Sivaram
Hi Cheng,

Thanks for the KIP, sounds like a good improvement. A couple of comments:

1) We currently have client connection timeouts on the broker with configs
named `xxx.socket.timeout.ms` (e.g. controller.socket.timeout.ms). I think
they started off as connection timeouts but now include authentication time
as well. Have we considered using similar configs for this case? We may
want to prefix the new config with `socket.` anyway - something along the
lines of `socket.connection.timeout.ms` if it is just the connection time.

2) The KIP proposes 10s as the default. What does this mean for typical
connections like a produce request going to the leader? Instead of one
connection attempt to the leader, we want three separate connection
attempts within the request timeout to the leader?

Regards,

Rajini


On Thu, May 7, 2020 at 11:51 PM Jose Garcia Sancio 
wrote:

> Cheng,
>
> Thanks for the KIP and the detailed proposal section. LGTM!
>
> On Thu, May 7, 2020 at 3:38 PM Cheng Tan  wrote:
> >
> > I think more about the potential wider use cases, I modified the
> proposal to target all the connection. Thanks.
> >
> > - Best, - Cheng Tan
> >
> > > On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
> > >
> > > Hi Colin,
> > >
> > > Sorry for the confusion. I’m proposing to implement timeout in the
> NetworkClient.leastLoadedNode() when iterating all the cached node. The
> alternative I can think is to implement the timeout in NetworkClient.poll()
> > >
> > > I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
> > > Usually when clients send a request, they will asking the network
> client to send the request to a specific node. In this case, the
> connection.setup.timeout won’t matter too much because the client doesn’t
> want to try other nodes for that specific request. The request level
> timeout would be enough. The metadata fetcher fetches the nodes status
> periodically so the clients can reassign the request to another node after
> timeout.
> > > Consumer, producer, and AdminClient are all using leastLoadedNode()
> for metadata fetch, where the connection setup timeout can play an
> important role. Unlike other requests can refer to the metadata for node
> condition, the metadata requests can only blindly choose a node for retry
> in the worst scenario. We want to make sure the client can get the metadata
> smoothly and as soon as possible. As a result, we need this
> connection.setup.timeout.
> > > Implementing the timeout in poll() or anywhere else might need an
> extra iteration of all nodes, which might downgrade the network client
> performance.
> > > I also updated the KIP content and KIP status. Please let me know if
> the above ideas make sense. Thanks.
> > >
> > > Best, - Cheng Tan
> > >
> > >
> > >
> > >> On May 4, 2020, at 5:26 PM, Colin McCabe  cmcc...@apache.org>> wrote:
> > >>
> > >> Hi Cheng,
> > >>
> > >> On the KIP page, it lists this KIP as "draft."  It seems like "under
> discussion" is appropriate here, right?
> > >>
> > >>> Currently, the initial socket connection timeout is depending on
> Linux kernel setting
> > >>> tcp_syn_retries. The timeout value is 2 ^ (tcp_sync_retries + 1) - 1
> seconds. For the
> > >>> reasons below, we want to control the client-side socket timeout
> directly using
> > >>> configuration files
> > >>
> > >> Linux is just one example of an OS that Kafka could run on, right?
> You could also be running on MacOS, for example.
> > >>
> > >>> I'm proposing to do a lazy socket connection time out. That is, we
> only check if
> > >>> we need to timeout a socket when we consider the corresponding node
> as a
> > >>> candidate in the node provider.
> > >>
> > >> The NodeProvider is an AdminClient abstraction, right?  Why wouldn't
> we implement a connection setup timeout for all clients, not just
> AdminClient?
> > >>
> > >> best,
> > >> Colin
> > >>
> > >> On Mon, May 4, 2020, at 13:18, Colin McCabe wrote:
> > >>> Hmm.  A big part of the reason behind the KIP is that the default
> > >>> connection timeout behavior of the OS doesn't work for Kafka, right?
> > >>> For example, on Linux, if we wait 127 seconds for a connection
> attempt
> > >>> to time out, we won't get a chance to make another attempt in most
> > >>> cases.  So I think it makes sense to set a shorter default.
> > >>>
> > >>> best,
> > >>> Colin
> > >>>
> > >>>
> > >>> On Mon, May 4, 2020, at 09:44, Jose Garcia Sancio wrote:
> >  Thanks for the KIP Cheng,
> > 
> > > The default value will be 10 seconds.
> > 
> >  I think we should make the default the current behavior. Meaning the
> >  default should leverage the default connect timeout from the
> operating
> >  system.
> > 
> > > Proposed Changes
> > 
> >  I don't fully understand this section. It seems like it is mainly
> >  focused on the problem with the current implementation. Can you
> >  explain how the proposed changes solve the problem?
> > 
> >  Thanks.
> > 
> > >>

[jira] [Resolved] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses

2020-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9798.
--
Resolution: Not A Problem

> Flaky test: 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
> 
>
> Key: KAFKA-9798
> URL: https://issues.apache.org/jira/browse/KAFKA-9798
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Bill Bejeck
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, test
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8989) Embedded broker could not be reached in unit test

2020-05-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8989.

Resolution: Won't Fix

> Embedded broker could not be reached in unit test
> -
>
> Key: KAFKA-8989
> URL: https://issues.apache.org/jira/browse/KAFKA-8989
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-05-13 Thread Sophie Blee-Goldman
Thanks Bruno! I took a look at the revised KIP and it looks good to me.

Sophie

On Wed, May 13, 2020 at 6:59 AM Bruno Cadonna  wrote:

> Hi John,
>
> Thank you for the feedback!
>
> I agree and I will change the KIP as I stated in my previous e-mail to
> Guozhang.
>
> Best,
> Bruno
>
> On Tue, May 12, 2020 at 3:07 AM John Roesler  wrote:
> >
> > Thanks, all.
> >
> > If you don’t mind, I’ll pitch in a few cents’ worth.
> >
> > In my life I’ve generally found more granular metrics to be more useful,
> as long as there’s a sane way to roll them up. It does seem nice to see it
> on the per-store level. For roll-up purposes, the task and thread tags
> should be sufficient.
> >
> > I think the only reason we make some metrics Debug is that _recording_
> them can be expensive. If there’s no added expense, I think we can just
> register store-level metrics at Info level.
> >
> > Thanks for the KIP, Bruno!
> > -John
> >
> > On Mon, May 11, 2020, at 17:32, Guozhang Wang wrote:
> > > Hello Sophie / Bruno,
> > >
> > > I've also thought about the leveling question, and one motivation I
> had for
> > > setting it in instance-level is that we want to expose it in INFO
> level:
> > > today our report leveling is not very finer grained --- which I think
> is
> > > sth. worth itself --- such that one have to either turn on all DEBUG
> > > metrics recording or none of them. If we can allow users to e.g.
> specify
> > > "turn on streams-metrics and stream-state-metrics, but not others" and
> then
> > > I think it should be just at store-level. However, right now if we
> want to
> > > set it as store-level then it would be DEBUG and not exposed by
> default.
> > >
> > > So it seems we have several options in addition to the proposed one:
> > >
> > > a) we set it at store-level as INFO; but then one can argue why this is
> > > INFO while others (bytes-written, etc) are DEBUG.
> > > b) we set it at store-level as DEBUG, believing that we do not usually
> need
> > > to turn it on.
> > > c) maybe, we can set it at task-level (? I'm not so sure myself about
> > > this.. :P) as INFO.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Mon, May 11, 2020 at 12:29 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > > wrote:
> > >
> > > > Hey Bruno,
> > > >
> > > > Thanks for the KIP! I have one high-level concern, which is that we
> should
> > > > consider
> > > > reporting these metrics on the per-store level rather than
> instance-wide. I
> > > > know I was
> > > > the one who first proposed making it instance-wide, so bear with me:
> > > >
> > > > While I would still argue that the instance-wide memory usage is
> probably
> > > > the most *useful*,
> > > > exposing them at the store-level does not prevent users from
> monitoring the
> > > > instance-wide
> > > > memory. They should be able to roll up all the store-level metrics
> on an
> > > > instance to
> > > > compute the total off-heap memory. But rolling it up for the users
> does
> > > > prevent them from
> > > > using this to debug rare cases where one store may be using
> significantly
> > > > more memory than
> > > > expected.
> > > >
> > > > It's also worth considering that some users may be using the bounded
> memory
> > > > config setter
> > > > to put a cap on the off-heap memory of the entire process, in which
> case
> > > > the memory usage
> > > > metric for any one store should reflect the memory usage of the
> entire
> > > > instance. In that case
> > > > any effort to roll up the memory usages ourselves would just be
> wasted.
> > > >
> > > > Sorry for the reversal, but after a second thought I'm pretty
> strongly in
> > > > favor of reporting these
> > > > at the store level.
> > > >
> > > > Best,
> > > > Sophie
> > > >
> > > > On Wed, May 6, 2020 at 8:41 AM Bruno Cadonna 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I'd like to discuss KIP-607 that aims to add RocksDB memory usage
> > > > > metrics to Kafka Streams.
> > > > >
> > > > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Record+the+Memory+Used+by+RocksDB+to+Kafka+Streams
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>


[jira] [Created] (KAFKA-9987) Add new cooperative assignor optimized for constant-subscription. group

2020-05-13 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9987:
--

 Summary: Add new cooperative assignor optimized for 
constant-subscription. group
 Key: KAFKA-9987
 URL: https://issues.apache.org/jira/browse/KAFKA-9987
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman


In KIP-429 we added the new CooperativeStickyAssignor which leverages on the 
underlying sticky assignment algorithm of the existing StickyAssignor (moved to 
AbstractStickyAssignor). 

 

The algorithm is fairly complex as it tries to optimize stickiness while 
satisfying perfect balance _in the case individual consumers may be subscribed 
to a random subset of the topics._ While it does a pretty good job at what it 
promises to do, it doesn't scale well with large numbers of consumers and 
partitions. 

 

If we can make the assumption that all consumers are subscribed to the same set 
of topics, we can simplify the algorithm greatly and do a sticky-but-balanced 
assignment in a single pass. It would be nice to have an additional cooperative 
assignor OOTB that performs efficiently for users who know their group will 
satisfy this constraint.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread Boyang Chen
Thanks for the KIP Sophie. Getting the E2E latency is important for
understanding the bottleneck of the application.

A couple of questions and ideas:

1. Could you clarify the rational of picking 75, 99 and max percentiles?
Normally I see cases where we use 50, 90 percentile as well in production
systems.

2. The current latency being computed is cumulative, I.E if a record goes
through A -> B -> C, then P(C) = T(B->C) + P(B) = T(B->C) + T(A->B) + T(A)
and so on, where P() represents the captured latency, and T() represents
the time for transiting the records between two nodes, including processing
time. For monitoring purpose, maybe having T(B->C) and T(A->B) are more
natural to view as "hop-to-hop latency", otherwise if there is a spike in
T(A->B), both P(B) and P(C) are affected in the same time. In the same
spirit, the E2E latency is meaningful only when the record exits from the
sink as this marks the whole time this record spent inside the funnel. Do
you think we could have separate treatment for sink nodes and other
nodes, so that other nodes only count the time receiving the record from
last hop? I'm not proposing a solution here, just want to discuss this
alternative to see if it is reasonable.

3. As we are going to monitor late arrival records as well, they would
create some really spiky graphs when the out-of-order records are
interleaving with on time records. Should we also supply a smooth version
of the latency metrics, or user should just take care of it by themself?

4. Regarding this new metrics, we haven't discussed its relation with our
existing processing latency metrics, could you add some context on
comparison and a simple `when to use which` tutorial for the best?

Boyang

On Tue, May 12, 2020 at 7:28 PM Sophie Blee-Goldman 
wrote:

> Hey all,
>
> I'd like to kick off discussion on KIP-613 which aims to add end-to-end
> latency metrics to Streams. Please take a look:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams
>
> Cheers,
> Sophie
>


[DISCUSS] KIP-615: add ConstrainedCooperativeStickyAssignor

2020-05-13 Thread Sophie Blee-Goldman
Hey all,

I'd like to propose adding another OOTB cooperative assignor to better meet
the needs of some users who don't need to full flexibility of the existing
CooperativeStickyAssignor:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-615%3A+add+ConstrainedCooperativeStickyAssignor

I included a sketch of the assignment algorithm in the KIP to give a rough
idea of what this would look like (and that it is possible), but this is
obviously an implementation detail and subject to change.

Cheer,
Sophie


[jira] [Resolved] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2020-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9897.
--
Resolution: Fixed

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-587 Suppress detailed responses for handled exceptions in security-sensitive environments

2020-05-13 Thread Christopher Egerton
Hi Connor,

I think this is really close but have one more thought. Uncaught exceptions
in the REST API are different from exceptions that come about when tasks or
connectors fail, and can be used for different purposes. Stack traces in
500 errors are probably only useful for the administrator of the Connect
cluster. However, if a user has tried to create a connector and sees that
it or one of its tasks has failed, a brief message about the cause of
failure might actually be pretty helpful, and if they can't get any
information on why the connector or task failed, then they're essentially
at the mercy of the Connect cluster administrator for figuring out what
caused the failure. Would it be alright to include the exception message,
but not the entire stack trace, in the response for requests to view the
status of a connector or task?

Cheers,

Chris

On Wed, May 6, 2020 at 12:07 PM Connor Penhale 
wrote:

> Hi Chris,
>
> Apologies for the name confusion! I've been working with the my customer
> sponsor over the last few weeks, and we finally have an answer regarding
> "only exceptions or all responses." This organization is really interested
> in removing stack traces from all responses, which will expand the scope of
> this KIP a bit. I'm going to update the wiki entry, and then would it be
> reasonable to call for a vote?
>
> Thanks!
> Connor
>
> On 4/17/20, 3:53 PM, "Christopher Egerton"  wrote:
>
> Hi Connor,
>
> That's great, but I think you may have mistaken Colin for me :)
>
> One more thing that should be addressed--the "public interfaces"
> section
> isn't just for Java interfaces, it's for any changes to any public
> part of
> Kafka that users and external developers interact with. As far as
> Connect
> is concerned, this includes (but is not limited to) the REST API and
> worker
> configuration properties, so it might be worth briefly summarizing the
> scope of your proposed changes in that section (something like "We
> plan on
> adding a new worker config named  that will affect the REST API
> under
> ".
>
> Cheers,
>
> Chris
>
> On Wed, Apr 15, 2020 at 1:00 PM Connor Penhale 
> wrote:
>
> > Hi Chris,
> >
> > I can ask the customer if they can disclose any additional
> information. I
> > provided the information around "PCI-DSS" to give the community a
> flavor of
> > the type of environment the customer was operating in. The current
> mode is
> > /not/ insecure, I would agree with this. I would be willing to agree
> that
> > my customer has particular security audit requirements that go above
> and
> > beyond what most environments would consider reasonable. Are you
> > comfortable with that language?
> >
> > " enable.rest.response.stack.traces" works great for me!
> >
> > I created a new class in the example PR because I wanted the highest
> > chance of not gunking up the works by stepping on toes in an
> important
> > class. I figured I'd be reducing risk by creating an alternative
> > implementing class. In retrospect, and now that I'm getting a
> first-hand
> > look at Kafka's community process, that is probably unnecessary.
> > Additionally, I would agree with your statement that we should
> modify the
> > existing ExceptionMapper to avoid behavior divergence in subsequent
> > releases and ensure this feature's particular scope is easy to
> maintain.
> >
> > Thanks!
> > Connor
> >
> > On 4/15/20, 1:17 PM, "Colin McCabe"  wrote:
> >
> > Hi Connor,
> >
> > I still would like to hear more about whether this feature is
> required
> > for PCI-DSS or any other security certification.  Nobody I talked to
> seemed
> > to think that it was-- if there are certifications that would
> require this,
> > it would be nice to know.  However, I don't object to implementing
> this as
> > long as we don't imply that the current mode is insecure.
> >
> > What do you think about using
> "enable.rest.response.stack.traces" as
> > the config name?  It seems like that  makes it clearer that it's a
> boolean
> > value.
> >
> > It's not really necessary to describe the internal
> implementation in
> > the KIP, but since you mentioned it, it's probably worth considering
> using
> > the current ExceptionMapper class with a different configuration
> rather
> > than creating a new one.
> >
> > best,
> > Colin
> >
> >
> > On Mon, Apr 13, 2020, at 09:04, Connor Penhale wrote:
> > > Hi Chris!
> > >
> > > RE: SSL, indeed, the issue is not that the information is not
> > > encrypted, but that there is no authorization layer.
> > >
> > > I'll be sure to edit the KIP as we continue discussion!
> > >
> > > RE: the 200 response you highlighted, great catch! I'll work
> with my
> > > c

Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread Sophie Blee-Goldman
1. I felt that 50% was not a particularly useful gauge for this specific
metric, as
it's presumably most useful at putting an *upper *bound on the latency you
can
reasonably expect to see. I chose percentiles that would hopefully give a
good
sense of what *most* records will experience, and what *close to all* records
will.

However I'm not married to these specific numbers and could be convinced.
Would be especially interested in hearing from users on this.

2. I'm inclined to not include the "hop-to-hop latency" in this KIP since
users
can always compute it themselves by subtracting the previous node's
end-to-end latency. I guess we could do it either way since you can always
compute one from the other, but I think the end-to-end latency feels more
valuable as it's main motivation is not to debug bottlenecks in the
topology but
to give users a sense of how long it takes arecord to be reflected in
certain parts
of the topology. For example this might be useful for users who are
wondering
roughly when a record that was just produced will be included in their IQ
results.
Debugging is just a nice side effect -- but maybe I didn't make that clear
enough
in the KIP's motivation.

3. Good question, I should address this in the KIP. The short answer is
"yes",
we will include late records. I added a paragraph to the end of the Proposed
Changes section explaining the reasoning here, please let me know if you
have
any concerns.

4. Assuming you're referring to the existing metric "process-latency", that
metric
reflects the time for the literal Node#process method to run whereas this
metric
would always be measured relative to the event timestamp.

That said, the naming collision there is pretty confusing so I've renamed
the
metrics in this KIP to "end-to-end-latency" which I feel better reflects
the nature
of the metric anyway.

Thanks for the feedback!

On Wed, May 13, 2020 at 10:21 AM Boyang Chen 
wrote:

> Thanks for the KIP Sophie. Getting the E2E latency is important for
> understanding the bottleneck of the application.
>
> A couple of questions and ideas:
>
> 1. Could you clarify the rational of picking 75, 99 and max percentiles?
> Normally I see cases where we use 50, 90 percentile as well in production
> systems.
>
> 2. The current latency being computed is cumulative, I.E if a record goes
> through A -> B -> C, then P(C) = T(B->C) + P(B) = T(B->C) + T(A->B) + T(A)
> and so on, where P() represents the captured latency, and T() represents
> the time for transiting the records between two nodes, including processing
> time. For monitoring purpose, maybe having T(B->C) and T(A->B) are more
> natural to view as "hop-to-hop latency", otherwise if there is a spike in
> T(A->B), both P(B) and P(C) are affected in the same time. In the same
> spirit, the E2E latency is meaningful only when the record exits from the
> sink as this marks the whole time this record spent inside the funnel. Do
> you think we could have separate treatment for sink nodes and other
> nodes, so that other nodes only count the time receiving the record from
> last hop? I'm not proposing a solution here, just want to discuss this
> alternative to see if it is reasonable.
>
> 3. As we are going to monitor late arrival records as well, they would
> create some really spiky graphs when the out-of-order records are
> interleaving with on time records. Should we also supply a smooth version
> of the latency metrics, or user should just take care of it by themself?
>
> 4. Regarding this new metrics, we haven't discussed its relation with our
> existing processing latency metrics, could you add some context on
> comparison and a simple `when to use which` tutorial for the best?
>
> Boyang
>
> On Tue, May 12, 2020 at 7:28 PM Sophie Blee-Goldman 
> wrote:
>
> > Hey all,
> >
> > I'd like to kick off discussion on KIP-613 which aims to add end-to-end
> > latency metrics to Streams. Please take a look:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams
> >
> > Cheers,
> > Sophie
> >
>


Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-13 Thread Anna Povzner
Hi David and Alexandre,

Thanks so much for your feedback! Here are my answers:

1. Yes, we have seen several cases of clients that create a new connection
per produce/consume request. One hypothesis is someone who is used to
connection pooling may accidentally write a Kafka client that creates a new
connection every time.

2 & 4. That's a good point I haven't considered. I think it makes sense to
provide an ability to limit connection creations per IP as well. This is
not hard to implement -- the broker already keeps track of the number of
connections per IP, and immediately closes a new connection if it comes
from an IP that reached the connection limit. So, we could additionally
track the rate, and close the connection from IP that exceeds the rate. One
slight concern is whether keeping track of per IP rates and quotas adds
overhead (CPU and memory). But perhaps it is not a problem if we use
expiring sensors.

It would still make sense to limit the overall connection creation rate for
the Kafka clusters which are shared among many different
applications/clients, since they may spike at the same time bringing the
total rate too high.

3. Controlling connection queue sizes only controls the share of time
network threads use for creating new connections (and accepting on Acceptor
thread) vs. doing other work on each Processor iteration. It does not
directly control how processing connection creations would be related to
other processing done by brokers like on request handler threads. So, while
controlling queue size may mitigate the issue for some of the workloads, it
does not guarantee that. Plus, if we want to limit how many connections are
created per IP, the queue size approach would not work, unless we go with a
"share" of the queue, which I think even further obscures what that setting
means (and what we would achieve as an end result). Does this answer the
question?

If there are no objections, I will update the KIP to add per IP connection
rate limits (config and enforcement).

Thanks,

Anna


On Tue, May 12, 2020 at 11:25 AM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hello,
>
> Thank you for the KIP.
>
> I experienced in the past genuine broker brownouts due to connection
> storms consuming most of the CPU available on the server and I think
> it is useful to protect against it.
>
> I tend to share the questions asked in points 2 and 4 from David. Is
> there still a risk of denial of service if the limit applies at the
> listener-level without differentiating between (an) “offending”
> client(s) and the others?
>
> To rebound on point 3 - conceptually one difference between capping
> the queue size or throttling as presented in the KIP would come from
> the time it takes to accept a connection and how that time evolves
> with the connection rate.
> Assuming that that time increases monotonically with resource
> utilization, the admissible rate of connections would decrease as the
> server becomes more loaded, if the limit was set on queue size.
>
> Thanks,
> Alexandre
>
> Le mar. 12 mai 2020 à 08:49, David Jacot  a écrit :
> >
> > Hi Anna,
> >
> > Thanks for the KIP! I have few questions:
> >
> > 1. You mention that some clients may create a new connections for each
> > requests: "Another example is clients that create a new connection for
> each
> > produce/consume request". I am curious here but do we know any clients
> > behaving like this?
> >
> > 2. I am a bit concerned by the impact of misbehaving clients on the other
> > ones. Let's say that we define a quota of 10 connections / sec for a
> broker
> > and that we have a misbehaving application constantly trying to create 20
> > connections on that broker. That application will constantly hit the
> quota
> > and
> > always have many pending connections in the queue waiting to be accepted.
> > Regular clients trying to connect would need to wait until all the
> pending
> > connections upfront in the queue are drained in the best case scenario or
> > won't be able to connect at all in the worst case scenario if the queue
> is
> > full.
> > Does it sound like a valid concern? How do you see this?
> >
> > 3. As you mention it in the KIP, we use bounded queues which already
> limit
> > the maximum number of connections that can be accepted. I wonder if we
> > could reach the same goal by making the size of the queue configurable.
> >
> > 4. Did you consider doing something similar to the connections quota
> which
> > limits the number of connections per IP? Instead of rate limiting all the
> > creation,
> > we could perhaps rate limit the number of creation per IP as well. That
> > could
> > perhaps reduce the effect on the other clients. That may be harder to
> > implement
> > though.
> >
> > Best,
> > David
> >
> > On Mon, May 11, 2020 at 7:58 PM Anna Povzner  wrote:
> >
> > > Hi,
> > >
> > > I just created KIP-612 to allow limiting connection creation rate on
> > > brokers, and would like to start a discussion.
> > >
> > >
> >

[VOTE] KIP-606: Add Metadata Context to MetricsReporter

2020-05-13 Thread Xavier Léauté
Hi everyone,

Folks seem happy with the state of the KIP, so I'd like to start the vote
for KIP-606
https://cwiki.apache.org/confluence/display/KAFKA/KIP-606%3A+Add+Metadata+Context+to+MetricsReporter

- Xavier


[jira] [Created] (KAFKA-9988) Log incorrectly reports task has failed when task takes too long to shutdown

2020-05-13 Thread Sanjana Kaundinya (Jira)
Sanjana Kaundinya created KAFKA-9988:


 Summary: Log incorrectly reports task has failed when task takes 
too long to shutdown
 Key: KAFKA-9988
 URL: https://issues.apache.org/jira/browse/KAFKA-9988
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sanjana Kaundinya


If the OffsetStorageReader is closed while the task is trying to shutdown, and 
the task is trying to access the offsets from the OffsetStorageReader, then we 
see the following in the logs.

{code:java}
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task threw 
an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
closed while attempting to read offsets. This is likely because the task was 
been scheduled to stop but has taken longer than the graceful shutdown period 
to do so.
at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
... 14 more
[2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=replicator-18} Task is 
being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

This is a bit misleading, because the task is already on its way of being 
shutdown, and doesn't actually need manual intervention to be restarted. We can 
see that as later on in the logs we see that it throws another unrecoverable 
exception.

{code:java}
[2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=replicator-18} Task threw 
an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
{code}

If we know a task is on its way of shutting down, we should not throw a 
ConnectException and instead log a warning so that we don't log false negatives.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-606: Add Metadata Context to MetricsReporter

2020-05-13 Thread Gwen Shapira
+1 (binding)
Thanks for the proposal, Xavier.

On Wed, May 13, 2020 at 11:54 AM Xavier Léauté  wrote:

> Hi everyone,
>
> Folks seem happy with the state of the KIP, so I'd like to start the vote
> for KIP-606
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-606%3A+Add+Metadata+Context+to+MetricsReporter
>
> - Xavier
>


-- 
Gwen Shapira


[jira] [Resolved] (KAFKA-9966) Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9966.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Flaky Test EosBetaUpgradeIntegrationTest#shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-9966
> URL: https://issues.apache.org/jira/browse/KAFKA-9966
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/285/testReport/junit/org.apache.kafka.streams.integration/EosBetaUpgradeIntegrationTest/shouldUpgradeFromEosAlphaToEosBeta_true_/]
> {quote}java.lang.AssertionError: Condition not met within timeout 6. 
> Clients did not startup and stabilize on time. Observed transitions: client-1 
> transitions: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] 
> client-2 transitions: [KeyValue(CREATED, REBALANCING), KeyValue(REBALANCING, 
> RUNNING), KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)] at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.waitForStateTransition(EosBetaUpgradeIntegrationTest.java:924)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:741){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread Guozhang Wang
Thanks Sophie for the KIP, a few quick thoughts:

1) The end-to-end latency includes both the processing latency of the task
and the latency spent sitting in intermediate topics. I have a similar
feeling as Boyang mentioned above that the latency metric of a task A
actually measures the latency of the sub-topology up-to but not including
the processing of A, which is a bit weird.

Maybe the my feeling comes from the name "latency" itself, since today we
already have several "latency" metrics already which are measuring elapsed
system-time for processing a record / etc, while here we are comparing the
system wallclock time with the record timestamp.

Maybe we can consider renaming it as "record-staleness" (note we already
have a "record-lateness" metric), in which case recording at the
system-time before we start processing the record sounds more natural.

2) With that in mind, I'm wondering if the processor-node-level DEBUG
metric is worth to add, given that we already have a task-level processing
latency metric. Basically, a specific node's e2e latency is similar to the
task-level e2e latency + task-level processing latency. Personally I think
having a task-level record-staleness metric is sufficient.



Guozhang



On Wed, May 13, 2020 at 11:46 AM Sophie Blee-Goldman 
wrote:

> 1. I felt that 50% was not a particularly useful gauge for this specific
> metric, as
> it's presumably most useful at putting an *upper *bound on the latency you
> can
> reasonably expect to see. I chose percentiles that would hopefully give a
> good
> sense of what *most* records will experience, and what *close to all*
> records
> will.
>
> However I'm not married to these specific numbers and could be convinced.
> Would be especially interested in hearing from users on this.
>
> 2. I'm inclined to not include the "hop-to-hop latency" in this KIP since
> users
> can always compute it themselves by subtracting the previous node's
> end-to-end latency. I guess we could do it either way since you can always
> compute one from the other, but I think the end-to-end latency feels more
> valuable as it's main motivation is not to debug bottlenecks in the
> topology but
> to give users a sense of how long it takes arecord to be reflected in
> certain parts
> of the topology. For example this might be useful for users who are
> wondering
> roughly when a record that was just produced will be included in their IQ
> results.
> Debugging is just a nice side effect -- but maybe I didn't make that clear
> enough
> in the KIP's motivation.
>
> 3. Good question, I should address this in the KIP. The short answer is
> "yes",
> we will include late records. I added a paragraph to the end of the
> Proposed
> Changes section explaining the reasoning here, please let me know if you
> have
> any concerns.
>
> 4. Assuming you're referring to the existing metric "process-latency", that
> metric
> reflects the time for the literal Node#process method to run whereas this
> metric
> would always be measured relative to the event timestamp.
>
> That said, the naming collision there is pretty confusing so I've renamed
> the
> metrics in this KIP to "end-to-end-latency" which I feel better reflects
> the nature
> of the metric anyway.
>
> Thanks for the feedback!
>
> On Wed, May 13, 2020 at 10:21 AM Boyang Chen 
> wrote:
>
> > Thanks for the KIP Sophie. Getting the E2E latency is important for
> > understanding the bottleneck of the application.
> >
> > A couple of questions and ideas:
> >
> > 1. Could you clarify the rational of picking 75, 99 and max percentiles?
> > Normally I see cases where we use 50, 90 percentile as well in production
> > systems.
> >
> > 2. The current latency being computed is cumulative, I.E if a record goes
> > through A -> B -> C, then P(C) = T(B->C) + P(B) = T(B->C) + T(A->B) +
> T(A)
> > and so on, where P() represents the captured latency, and T() represents
> > the time for transiting the records between two nodes, including
> processing
> > time. For monitoring purpose, maybe having T(B->C) and T(A->B) are more
> > natural to view as "hop-to-hop latency", otherwise if there is a spike in
> > T(A->B), both P(B) and P(C) are affected in the same time. In the same
> > spirit, the E2E latency is meaningful only when the record exits from the
> > sink as this marks the whole time this record spent inside the funnel. Do
> > you think we could have separate treatment for sink nodes and other
> > nodes, so that other nodes only count the time receiving the record from
> > last hop? I'm not proposing a solution here, just want to discuss this
> > alternative to see if it is reasonable.
> >
> > 3. As we are going to monitor late arrival records as well, they would
> > create some really spiky graphs when the out-of-order records are
> > interleaving with on time records. Should we also supply a smooth version
> > of the latency metrics, or user should just take care of it by themself?
> >
> > 4. Regarding this new metrics, we haven't

Re: [DISCUSS] KIP-607: Add Metrics to Record the Memory Used by RocksDB to Kafka Streams

2020-05-13 Thread Guozhang Wang
Hi Bruno,

Sounds good to me.

I think I'm just a bit more curious to see its impact on performance: as
long as we have one INFO level rocksDB metrics, then we'd have to turn on
the scheduled rocksdb metrics recorder whereas previously, we can decide to
not turn on the recorder at all if all are set as DEBUG and we configure at
INFO level in production. But this is an implementation detail anyways and
maybe the impact is negligible after all. We can check and re-discuss this
afterwards :)


Guozhang


On Wed, May 13, 2020 at 9:34 AM Sophie Blee-Goldman 
wrote:

> Thanks Bruno! I took a look at the revised KIP and it looks good to me.
>
> Sophie
>
> On Wed, May 13, 2020 at 6:59 AM Bruno Cadonna  wrote:
>
> > Hi John,
> >
> > Thank you for the feedback!
> >
> > I agree and I will change the KIP as I stated in my previous e-mail to
> > Guozhang.
> >
> > Best,
> > Bruno
> >
> > On Tue, May 12, 2020 at 3:07 AM John Roesler 
> wrote:
> > >
> > > Thanks, all.
> > >
> > > If you don’t mind, I’ll pitch in a few cents’ worth.
> > >
> > > In my life I’ve generally found more granular metrics to be more
> useful,
> > as long as there’s a sane way to roll them up. It does seem nice to see
> it
> > on the per-store level. For roll-up purposes, the task and thread tags
> > should be sufficient.
> > >
> > > I think the only reason we make some metrics Debug is that _recording_
> > them can be expensive. If there’s no added expense, I think we can just
> > register store-level metrics at Info level.
> > >
> > > Thanks for the KIP, Bruno!
> > > -John
> > >
> > > On Mon, May 11, 2020, at 17:32, Guozhang Wang wrote:
> > > > Hello Sophie / Bruno,
> > > >
> > > > I've also thought about the leveling question, and one motivation I
> > had for
> > > > setting it in instance-level is that we want to expose it in INFO
> > level:
> > > > today our report leveling is not very finer grained --- which I think
> > is
> > > > sth. worth itself --- such that one have to either turn on all DEBUG
> > > > metrics recording or none of them. If we can allow users to e.g.
> > specify
> > > > "turn on streams-metrics and stream-state-metrics, but not others"
> and
> > then
> > > > I think it should be just at store-level. However, right now if we
> > want to
> > > > set it as store-level then it would be DEBUG and not exposed by
> > default.
> > > >
> > > > So it seems we have several options in addition to the proposed one:
> > > >
> > > > a) we set it at store-level as INFO; but then one can argue why this
> is
> > > > INFO while others (bytes-written, etc) are DEBUG.
> > > > b) we set it at store-level as DEBUG, believing that we do not
> usually
> > need
> > > > to turn it on.
> > > > c) maybe, we can set it at task-level (? I'm not so sure myself about
> > > > this.. :P) as INFO.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, May 11, 2020 at 12:29 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey Bruno,
> > > > >
> > > > > Thanks for the KIP! I have one high-level concern, which is that we
> > should
> > > > > consider
> > > > > reporting these metrics on the per-store level rather than
> > instance-wide. I
> > > > > know I was
> > > > > the one who first proposed making it instance-wide, so bear with
> me:
> > > > >
> > > > > While I would still argue that the instance-wide memory usage is
> > probably
> > > > > the most *useful*,
> > > > > exposing them at the store-level does not prevent users from
> > monitoring the
> > > > > instance-wide
> > > > > memory. They should be able to roll up all the store-level metrics
> > on an
> > > > > instance to
> > > > > compute the total off-heap memory. But rolling it up for the users
> > does
> > > > > prevent them from
> > > > > using this to debug rare cases where one store may be using
> > significantly
> > > > > more memory than
> > > > > expected.
> > > > >
> > > > > It's also worth considering that some users may be using the
> bounded
> > memory
> > > > > config setter
> > > > > to put a cap on the off-heap memory of the entire process, in which
> > case
> > > > > the memory usage
> > > > > metric for any one store should reflect the memory usage of the
> > entire
> > > > > instance. In that case
> > > > > any effort to roll up the memory usages ourselves would just be
> > wasted.
> > > > >
> > > > > Sorry for the reversal, but after a second thought I'm pretty
> > strongly in
> > > > > favor of reporting these
> > > > > at the store level.
> > > > >
> > > > > Best,
> > > > > Sophie
> > > > >
> > > > > On Wed, May 6, 2020 at 8:41 AM Bruno Cadonna 
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I'd like to discuss KIP-607 that aims to add RocksDB memory usage
> > > > > > metrics to Kafka Streams.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Record+the+Memory+Used+by+RocksDB+to+Kafka+Streams
> > > > > >
> > > > > > Best,
> >

Build failed in Jenkins: kafka-trunk-jdk11 #1451

2020-05-13 Thread Apache Jenkins Server
See 

Changes:


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H30 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Could not init 

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$5.execute(CliGitAPIImpl.java:916)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$2.execute(CliGitAPIImpl.java:708)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:153)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:146)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
H30
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1743)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:957)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.execute(RemoteGitImpl.java:146)
at sun.reflect.GeneratedMethodAccessor1196.invoke(Unknown 
Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.invoke(RemoteGitImpl.java:132)
at com.sun.proxy.$Proxy137.execute(Unknown Source)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1152)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1192)
at hudson.scm.SCM.checkout(SCM.java:504)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused by: hudson.plugins.git.GitException: Error performing git command
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2181)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2140)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2136)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommand(CliGitAPIImpl.java:1741)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$5.execute(CliGitAPIImpl.java:914)
... 11 more
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at hudson.Proc$LocalProc.(Proc.java:282)
at hudson.Proc$LocalProc.(Proc.java:219)
at hudson.Launcher$LocalLauncher.launch(Launcher.java:937)
at hudson.Launcher$ProcStarter.start(Launcher.java:455)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2168)
... 15 more
ERROR: Error cloning remote repo 'origin'
Retrying after 10 seconds
No credentials specified
 > git rev-parse --is-inside-work-tree # timeout=10
ERROR: Workspace has a .git repository, but it appears to be corrupt.
hudson.plugins.git.GitException: Error performing git command
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launch

[jira] [Created] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task

2020-05-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9989:
--

 Summary: StreamsUpgradeTest.test_metadata_upgrade could not 
guarantee all processor gets assigned task
 Key: KAFKA-9989
 URL: https://issues.apache.org/jira/browse/KAFKA-9989
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen
Assignee: Boyang Chen


System test StreamsUpgradeTest.test_metadata_upgrade could fail due to:

"Never saw output 'processed [0-9]* records' on ubuntu@worker6"

which if we take a closer look at, the rebalance happens but has no task 
assignment. We should fix this problem by making the rebalance result as part 
of the check, and skip the record processing validation when the assignment is 
empty. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread John Roesler
Hello all, and thanks for the KIP, Sophie,

Just some comments on the discussion so far:

B2/G1:
In principle, it shouldn't matter whether we report "spans" or
"end-to-end" latency. But in practice, some of the spans are pretty
difficult to really measure (like time spent waiting in the topics, or 
time from the event happening to the ETL producer choosing to send it,
or time spent in send/receive buffers, etc., etc.

In other words, it's practically easier to compute spans by subtracting
e2e latencies than it is to compute e2e latencies by adding spans. You
can even just consider that the span computation from e2e always just
involves subtracting two numbers, whereas computing e2e latency from
spans involves adding _all_ the spans leading up to the end you care about.

It seems like people really prefer to have spans when they are debugging
latency problems, whereas e2e latency is a more general measurement
that basically every person/application cares about and should be
monitoring.

Altogether, it really seem to provide more value to more people if we report
e2e latencies. Regarding "record-staleness" as a name, I think I have no
preference, I'd defer to other peoples' intuition.

G2:
I think the processor-node metric is nice, since the inside of a task can
introduce a significant amount of latency in some cases. Plus, it's a more
direct measurement, if you really wanted to know (for the purposes of IQ
or something) how long it takes source events to "show up" at the store.

I think actually recording it at every processor could be expensive, but we
already record a bunch of metrics at the node level.

B1:
I think 50% could be reasonable to record also. Even if it's a poor metric
for operational purposes, a lot of people might expect to see "mean". Actually,
I was surprised not to see "min". Is there a reason to leave it off?

I might suggest:
min, mean (50th), 75th, 99th, max

B3:
I agree we should include late records (though not the ones we drop).
It may be spiky, but only when there are legitimately some records with a
high end-to-end latency, which is the whole point of these metrics.

That's it! I don't think I have any other feedback, other than a request to
also report "min".

Thanks,
-John

On Wed, May 13, 2020, at 16:58, Guozhang Wang wrote:
> Thanks Sophie for the KIP, a few quick thoughts:
> 
> 1) The end-to-end latency includes both the processing latency of the task
> and the latency spent sitting in intermediate topics. I have a similar
> feeling as Boyang mentioned above that the latency metric of a task A
> actually measures the latency of the sub-topology up-to but not including
> the processing of A, which is a bit weird.
> 
> Maybe the my feeling comes from the name "latency" itself, since today we
> already have several "latency" metrics already which are measuring elapsed
> system-time for processing a record / etc, while here we are comparing the
> system wallclock time with the record timestamp.
> 
> Maybe we can consider renaming it as "record-staleness" (note we already
> have a "record-lateness" metric), in which case recording at the
> system-time before we start processing the record sounds more natural.
> 
> 2) With that in mind, I'm wondering if the processor-node-level DEBUG
> metric is worth to add, given that we already have a task-level processing
> latency metric. Basically, a specific node's e2e latency is similar to the
> task-level e2e latency + task-level processing latency. Personally I think
> having a task-level record-staleness metric is sufficient.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Wed, May 13, 2020 at 11:46 AM Sophie Blee-Goldman 
> wrote:
> 
> > 1. I felt that 50% was not a particularly useful gauge for this specific
> > metric, as
> > it's presumably most useful at putting an *upper *bound on the latency you
> > can
> > reasonably expect to see. I chose percentiles that would hopefully give a
> > good
> > sense of what *most* records will experience, and what *close to all*
> > records
> > will.
> >
> > However I'm not married to these specific numbers and could be convinced.
> > Would be especially interested in hearing from users on this.
> >
> > 2. I'm inclined to not include the "hop-to-hop latency" in this KIP since
> > users
> > can always compute it themselves by subtracting the previous node's
> > end-to-end latency. I guess we could do it either way since you can always
> > compute one from the other, but I think the end-to-end latency feels more
> > valuable as it's main motivation is not to debug bottlenecks in the
> > topology but
> > to give users a sense of how long it takes arecord to be reflected in
> > certain parts
> > of the topology. For example this might be useful for users who are
> > wondering
> > roughly when a record that was just produced will be included in their IQ
> > results.
> > Debugging is just a nice side effect -- but maybe I didn't make that clear
> > enough
> > in the KIP's motivation.
> >
> > 3. Good question, I shoul

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-13 Thread Anna Povzner
I updated the KIP to add a new broker configuration to limit connection
creation rate per IP: max.connection.creation.rate.per.ip. Once the limit
is reached for a particular IP address, the broker will reject the
connection from that IP (close the connection it accepted) and continue
rejecting them until the rate is back within the rate limit.

On Wed, May 13, 2020 at 11:46 AM Anna Povzner  wrote:

> Hi David and Alexandre,
>
> Thanks so much for your feedback! Here are my answers:
>
> 1. Yes, we have seen several cases of clients that create a new connection
> per produce/consume request. One hypothesis is someone who is used to
> connection pooling may accidentally write a Kafka client that creates a new
> connection every time.
>
> 2 & 4. That's a good point I haven't considered. I think it makes sense to
> provide an ability to limit connection creations per IP as well. This is
> not hard to implement -- the broker already keeps track of the number of
> connections per IP, and immediately closes a new connection if it comes
> from an IP that reached the connection limit. So, we could additionally
> track the rate, and close the connection from IP that exceeds the rate. One
> slight concern is whether keeping track of per IP rates and quotas adds
> overhead (CPU and memory). But perhaps it is not a problem if we use
> expiring sensors.
>
> It would still make sense to limit the overall connection creation rate
> for the Kafka clusters which are shared among many different
> applications/clients, since they may spike at the same time bringing the
> total rate too high.
>
> 3. Controlling connection queue sizes only controls the share of time
> network threads use for creating new connections (and accepting on Acceptor
> thread) vs. doing other work on each Processor iteration. It does not
> directly control how processing connection creations would be related to
> other processing done by brokers like on request handler threads. So, while
> controlling queue size may mitigate the issue for some of the workloads, it
> does not guarantee that. Plus, if we want to limit how many connections are
> created per IP, the queue size approach would not work, unless we go with a
> "share" of the queue, which I think even further obscures what that setting
> means (and what we would achieve as an end result). Does this answer the
> question?
>
> If there are no objections, I will update the KIP to add per IP connection
> rate limits (config and enforcement).
>
> Thanks,
>
> Anna
>
>
> On Tue, May 12, 2020 at 11:25 AM Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
>> Hello,
>>
>> Thank you for the KIP.
>>
>> I experienced in the past genuine broker brownouts due to connection
>> storms consuming most of the CPU available on the server and I think
>> it is useful to protect against it.
>>
>> I tend to share the questions asked in points 2 and 4 from David. Is
>> there still a risk of denial of service if the limit applies at the
>> listener-level without differentiating between (an) “offending”
>> client(s) and the others?
>>
>> To rebound on point 3 - conceptually one difference between capping
>> the queue size or throttling as presented in the KIP would come from
>> the time it takes to accept a connection and how that time evolves
>> with the connection rate.
>> Assuming that that time increases monotonically with resource
>> utilization, the admissible rate of connections would decrease as the
>> server becomes more loaded, if the limit was set on queue size.
>>
>> Thanks,
>> Alexandre
>>
>> Le mar. 12 mai 2020 à 08:49, David Jacot  a écrit :
>> >
>> > Hi Anna,
>> >
>> > Thanks for the KIP! I have few questions:
>> >
>> > 1. You mention that some clients may create a new connections for each
>> > requests: "Another example is clients that create a new connection for
>> each
>> > produce/consume request". I am curious here but do we know any clients
>> > behaving like this?
>> >
>> > 2. I am a bit concerned by the impact of misbehaving clients on the
>> other
>> > ones. Let's say that we define a quota of 10 connections / sec for a
>> broker
>> > and that we have a misbehaving application constantly trying to create
>> 20
>> > connections on that broker. That application will constantly hit the
>> quota
>> > and
>> > always have many pending connections in the queue waiting to be
>> accepted.
>> > Regular clients trying to connect would need to wait until all the
>> pending
>> > connections upfront in the queue are drained in the best case scenario
>> or
>> > won't be able to connect at all in the worst case scenario if the queue
>> is
>> > full.
>> > Does it sound like a valid concern? How do you see this?
>> >
>> > 3. As you mention it in the KIP, we use bounded queues which already
>> limit
>> > the maximum number of connections that can be accepted. I wonder if we
>> > could reach the same goal by making the size of the queue configurable.
>> >
>> > 4. Did you consider doing something similar to t

Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread Sophie Blee-Goldman
G1:
I was considering it as the "end-to-end latency *up* to the specific task"
but
I'm happy with "record-staleness" if that drives the point home better. So
it's the
"staleness of the record when it is received by that task" -- will update
the KIP

B1/J:
I'm struggling to imagine a case where the min would actually be useful,
rather than
just intellectually interesting. I don't feel strongly that we shouldn't
add it, but that's
why I didn't include it from the start. Can you enlighten me with an
example?

I was also vaguely concerned about the overhead of adding multiple
percentile
metrics. Do we have any data to indicate what kind of performance hit we
take on
metrics computation?

Also, not to be too pedantic but the 50th percentile would be the median
not the
mean. Would you propose to add the mean *and* the 50th percentile, or just
one
of the two?

Thanks all!
Sophie

On Wed, May 13, 2020 at 3:34 PM John Roesler  wrote:

> Hello all, and thanks for the KIP, Sophie,
>
> Just some comments on the discussion so far:
>
> B2/G1:
> In principle, it shouldn't matter whether we report "spans" or
> "end-to-end" latency. But in practice, some of the spans are pretty
> difficult to really measure (like time spent waiting in the topics, or
> time from the event happening to the ETL producer choosing to send it,
> or time spent in send/receive buffers, etc., etc.
>
> In other words, it's practically easier to compute spans by subtracting
> e2e latencies than it is to compute e2e latencies by adding spans. You
> can even just consider that the span computation from e2e always just
> involves subtracting two numbers, whereas computing e2e latency from
> spans involves adding _all_ the spans leading up to the end you care about.
>
> It seems like people really prefer to have spans when they are debugging
> latency problems, whereas e2e latency is a more general measurement
> that basically every person/application cares about and should be
> monitoring.
>
> Altogether, it really seem to provide more value to more people if we
> report
> e2e latencies. Regarding "record-staleness" as a name, I think I have no
> preference, I'd defer to other peoples' intuition.
>
> G2:
> I think the processor-node metric is nice, since the inside of a task can
> introduce a significant amount of latency in some cases. Plus, it's a more
> direct measurement, if you really wanted to know (for the purposes of IQ
> or something) how long it takes source events to "show up" at the store.
>
> I think actually recording it at every processor could be expensive, but we
> already record a bunch of metrics at the node level.
>
> B1:
> I think 50% could be reasonable to record also. Even if it's a poor metric
> for operational purposes, a lot of people might expect to see "mean".
> Actually,
> I was surprised not to see "min". Is there a reason to leave it off?
>
> I might suggest:
> min, mean (50th), 75th, 99th, max
>
> B3:
> I agree we should include late records (though not the ones we drop).
> It may be spiky, but only when there are legitimately some records with a
> high end-to-end latency, which is the whole point of these metrics.
>
> That's it! I don't think I have any other feedback, other than a request to
> also report "min".
>
> Thanks,
> -John
>
> On Wed, May 13, 2020, at 16:58, Guozhang Wang wrote:
> > Thanks Sophie for the KIP, a few quick thoughts:
> >
> > 1) The end-to-end latency includes both the processing latency of the
> task
> > and the latency spent sitting in intermediate topics. I have a similar
> > feeling as Boyang mentioned above that the latency metric of a task A
> > actually measures the latency of the sub-topology up-to but not including
> > the processing of A, which is a bit weird.
> >
> > Maybe the my feeling comes from the name "latency" itself, since today we
> > already have several "latency" metrics already which are measuring
> elapsed
> > system-time for processing a record / etc, while here we are comparing
> the
> > system wallclock time with the record timestamp.
> >
> > Maybe we can consider renaming it as "record-staleness" (note we already
> > have a "record-lateness" metric), in which case recording at the
> > system-time before we start processing the record sounds more natural.
> >
> > 2) With that in mind, I'm wondering if the processor-node-level DEBUG
> > metric is worth to add, given that we already have a task-level
> processing
> > latency metric. Basically, a specific node's e2e latency is similar to
> the
> > task-level e2e latency + task-level processing latency. Personally I
> think
> > having a task-level record-staleness metric is sufficient.
> >
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, May 13, 2020 at 11:46 AM Sophie Blee-Goldman <
> sop...@confluent.io>
> > wrote:
> >
> > > 1. I felt that 50% was not a particularly useful gauge for this
> specific
> > > metric, as
> > > it's presumably most useful at putting an *upper *bound on the latency
> you
> > > can
> > > reasonably expe

[jira] [Created] (KAFKA-9990) Supporting transactions in tiered storage

2020-05-13 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-9990:
-

 Summary: Supporting transactions in tiered storage
 Key: KAFKA-9990
 URL: https://issues.apache.org/jira/browse/KAFKA-9990
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread John Roesler
Oh boy, I never miss an opportunity to embarrass myself. I guess the mean seems 
more interesting to me than the median, but neither are as interesting as the 
higher percentiles (99th and max). 

Min isn’t really important for any SLAs, but it does round out the mental 
picture of the distribution. I’ve always graphed min along with the other 
metrics to help me understand how fast the system can be, which helps in 
optimization decisions. It’s also a relatively inexpensive metric to compute, 
so it might be nice to just throw it in. 

On Wed, May 13, 2020, at 18:18, Sophie Blee-Goldman wrote:
> G1:
> I was considering it as the "end-to-end latency *up* to the specific task"
> but
> I'm happy with "record-staleness" if that drives the point home better. So
> it's the
> "staleness of the record when it is received by that task" -- will update
> the KIP
> 
> B1/J:
> I'm struggling to imagine a case where the min would actually be useful,
> rather than
> just intellectually interesting. I don't feel strongly that we shouldn't
> add it, but that's
> why I didn't include it from the start. Can you enlighten me with an
> example?
> 
> I was also vaguely concerned about the overhead of adding multiple
> percentile
> metrics. Do we have any data to indicate what kind of performance hit we
> take on
> metrics computation?
> 
> Also, not to be too pedantic but the 50th percentile would be the median
> not the
> mean. Would you propose to add the mean *and* the 50th percentile, or just
> one
> of the two?
> 
> Thanks all!
> Sophie
> 
> On Wed, May 13, 2020 at 3:34 PM John Roesler  wrote:
> 
> > Hello all, and thanks for the KIP, Sophie,
> >
> > Just some comments on the discussion so far:
> >
> > B2/G1:
> > In principle, it shouldn't matter whether we report "spans" or
> > "end-to-end" latency. But in practice, some of the spans are pretty
> > difficult to really measure (like time spent waiting in the topics, or
> > time from the event happening to the ETL producer choosing to send it,
> > or time spent in send/receive buffers, etc., etc.
> >
> > In other words, it's practically easier to compute spans by subtracting
> > e2e latencies than it is to compute e2e latencies by adding spans. You
> > can even just consider that the span computation from e2e always just
> > involves subtracting two numbers, whereas computing e2e latency from
> > spans involves adding _all_ the spans leading up to the end you care about.
> >
> > It seems like people really prefer to have spans when they are debugging
> > latency problems, whereas e2e latency is a more general measurement
> > that basically every person/application cares about and should be
> > monitoring.
> >
> > Altogether, it really seem to provide more value to more people if we
> > report
> > e2e latencies. Regarding "record-staleness" as a name, I think I have no
> > preference, I'd defer to other peoples' intuition.
> >
> > G2:
> > I think the processor-node metric is nice, since the inside of a task can
> > introduce a significant amount of latency in some cases. Plus, it's a more
> > direct measurement, if you really wanted to know (for the purposes of IQ
> > or something) how long it takes source events to "show up" at the store.
> >
> > I think actually recording it at every processor could be expensive, but we
> > already record a bunch of metrics at the node level.
> >
> > B1:
> > I think 50% could be reasonable to record also. Even if it's a poor metric
> > for operational purposes, a lot of people might expect to see "mean".
> > Actually,
> > I was surprised not to see "min". Is there a reason to leave it off?
> >
> > I might suggest:
> > min, mean (50th), 75th, 99th, max
> >
> > B3:
> > I agree we should include late records (though not the ones we drop).
> > It may be spiky, but only when there are legitimately some records with a
> > high end-to-end latency, which is the whole point of these metrics.
> >
> > That's it! I don't think I have any other feedback, other than a request to
> > also report "min".
> >
> > Thanks,
> > -John
> >
> > On Wed, May 13, 2020, at 16:58, Guozhang Wang wrote:
> > > Thanks Sophie for the KIP, a few quick thoughts:
> > >
> > > 1) The end-to-end latency includes both the processing latency of the
> > task
> > > and the latency spent sitting in intermediate topics. I have a similar
> > > feeling as Boyang mentioned above that the latency metric of a task A
> > > actually measures the latency of the sub-topology up-to but not including
> > > the processing of A, which is a bit weird.
> > >
> > > Maybe the my feeling comes from the name "latency" itself, since today we
> > > already have several "latency" metrics already which are measuring
> > elapsed
> > > system-time for processing a record / etc, while here we are comparing
> > the
> > > system wallclock time with the record timestamp.
> > >
> > > Maybe we can consider renaming it as "record-staleness" (note we already
> > > have a "record-lateness" metric), in which case reco

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-13 Thread Guozhang Wang
Thanks Adam, Sagar.

I read your PR as well the rocksDB reference, and I have a few quick
questions:

1. In your code I saw you did not specifically overwrite any rocksDB
configs like `useFixedLengthPrefixExtractor`. Also, by comparing the
`RocksDBPrefixIterator` and `RocksDBRangeIterator` classes, their `seek()`
calls are not different, so suppose two calls:

range(bytesFrom, bytesTo) where bytesTo is just `bytesFrom + 1`.
prefix(bytesPrefix)

Their performance should be exactly the same? Do we need to enable prefix
extractor --- note although today we have a bloom filter, it is not based
on prefix but the whole key with 10bytes / block-base-mode by default ---
in order to have a better performance?

2. Also, my understanding is that, if the byte length in `iterator#seek()`
call is smaller than the prefix length, then it would not have any effects;
only if the bytes passed into `iterator#seek()` is larger would we save
extra IOs, is that right?

3. Assuming that we can indeed get the perf benefits by turning on some
configs (then existing stores cannot use it unless we restarts them with
new configs), I looked at the code a bit deeper and here's a wild thought
--- I think you have already been there, so maybe you can let me know what
blocks that idea:

a) Add a new

```
KeyValueIterator prefixRange(final Bytes prefix, final Bytes
from, final Bytes to);
```

to the internal interface `Segment`.

b) Add its implementation `prefixRange(final Bytes prefix, final Bytes
from, final Bytes to)` that relies on the augmented `RocksDBPrefixIterator`
where we first seek by the prefix, and then continue dropping makeNext
until we hit the `from` bytes, and then start returning the values until we
hit `to` bytes and stop.

c) `WindowStoreIterator ReadOnlyWindowStore#fetch(K key, Instant from,
Instant to)` then can be implemented as `prefixRange(final Bytes prefix,
final Bytes from, final Bytes to)`.


Let me know if that makes sense?


Guozhang



On Tue, May 12, 2020 at 5:02 PM Adam Bellemare 
wrote:

> Hi Guozhang
>
> For clarity, the issues I was running into was not about the actual
> *prefixSeek* function itself, but about exposing it to the same level of
> access as the *range* function throughout Kafka Streams. It required a lot
> of changes, and also required that most state stores stub it out since it
> wasn't clear how they would implement it. It was basically an overreaching
> API change that was easily solved (for the specific prefix-scan in FKJ) by
> simply using *range*. So to be clear, the blockers were predominantly
> around correctly handling the API changes, nothing to do with the
> mechanisms of the RocksDB prefix scanning.
>
> As for KAFKA-5285 I'll look into it more to see if I can get a better
> handle on the problem!
>
> Hope this helps clear it up.
>
> Adam
>
>
> On Tue, May 12, 2020 at 7:16 PM Guozhang Wang  wrote:
>
> > Hello Adam,
> >
> > I'm wondering if you can provide a bit more context on the blockers of
> > using prefixSeek of RocksDB (I saw you have a RocksDBPrefixIterator class
> > but not used anywhere yet)? I'm currently looking at ways to allow some
> > secondary indices with rocksDB following some existing approaches
> > from CockroachDB etc so I'm very curious to learn your experience.
> >
> > 1) Before considering any secondary indices, a quick thought is that for
> > (key, timeFrom, timeTo) queries, we can easily replace the current
> > `range()` impl with a `prefixRange()` impl via a prefix iterator; though
> > for (keyFrom, keyTo, timeFrom, timeTo) it is much more complicated indeed
> > and hence existing `range()` impl may still be used.
> >
> > 2) Another related issue I've been pondering for a while is
> > around KAFKA-5285: with the default lexicograpic byte comparator, since
> the
> > key length varies, the combo (key, window) would have interleaving byte
> > layouts like:
> >
> > AAA0001  (key AAA, timestamp 0001)
> > AAA00011(key AAA0, timestamp 0011)
> > AAA0002  (key AAA, timestamp 0002)
> >
> > which is challenging for prefix seeks to work efficiently. Although we
> can
> > overwrite the byte-comparator in JNI it is very expensive and the cost of
> > JNI overwhelms its benefits. If you've got some ideas around it please
> lmk
> > as well.
> >
> > Guozhang
> >
> >
> >
> >
> > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare  >
> > wrote:
> >
> > > Hi Sagar
> > >
> > > I implemented a very similar interface for KIP-213, the foreign-key
> > joiner.
> > > We pulled it out of the final implementation and instead used RocksDB
> > range
> > > instead. You can see the particular code where we use
> RocksDB.range(...)
> > to
> > > get the same iterator result.
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > >
> > > We pulled it out because there were numerous awkward acrobatics to
> > > integrate *prefixS

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-13 Thread Sophie Blee-Goldman
Not to derail this KIP discussion, but to leave a few notes on some of the
RocksDB points that have come up:

Someone actually merged some long overdue performance improvements to
the RocksJava implementation (the PR was opened back in 2017! yikes).
I haven't looked into the prefix seek API closely enough to know how
relevant
this particular change is, and they are still improving things, but it
gives me some
faith.

There are some pretty promising results reported on the PR:
https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037

Regarding the custom comparator, they also recently merged this performance

improvement . The tl;dr is
they reduced the slowdown of a custom comparator in Java
(relative to the native C++) from ~7x to ~5.2x at best. Which is still not
great, but it
would be interesting to run our own benchmarks and see how this stacks up.

Of course, these are all new changes and as such will require us to upgrade
rocks to 6.x which means they have to wait for us to release a 3.0. But
there's
some talk about 3.0 coming in the next few releases so consider it food for
not-so-future thought


On Tue, May 12, 2020 at 5:02 PM Adam Bellemare 
wrote:

> Hi Guozhang
>
> For clarity, the issues I was running into was not about the actual
> *prefixSeek* function itself, but about exposing it to the same level of
> access as the *range* function throughout Kafka Streams. It required a lot
> of changes, and also required that most state stores stub it out since it
> wasn't clear how they would implement it. It was basically an overreaching
> API change that was easily solved (for the specific prefix-scan in FKJ) by
> simply using *range*. So to be clear, the blockers were predominantly
> around correctly handling the API changes, nothing to do with the
> mechanisms of the RocksDB prefix scanning.
>
> As for KAFKA-5285 I'll look into it more to see if I can get a better
> handle on the problem!
>
> Hope this helps clear it up.
>
> Adam
>
>
> On Tue, May 12, 2020 at 7:16 PM Guozhang Wang  wrote:
>
> > Hello Adam,
> >
> > I'm wondering if you can provide a bit more context on the blockers of
> > using prefixSeek of RocksDB (I saw you have a RocksDBPrefixIterator class
> > but not used anywhere yet)? I'm currently looking at ways to allow some
> > secondary indices with rocksDB following some existing approaches
> > from CockroachDB etc so I'm very curious to learn your experience.
> >
> > 1) Before considering any secondary indices, a quick thought is that for
> > (key, timeFrom, timeTo) queries, we can easily replace the current
> > `range()` impl with a `prefixRange()` impl via a prefix iterator; though
> > for (keyFrom, keyTo, timeFrom, timeTo) it is much more complicated indeed
> > and hence existing `range()` impl may still be used.
> >
> > 2) Another related issue I've been pondering for a while is
> > around KAFKA-5285: with the default lexicograpic byte comparator, since
> the
> > key length varies, the combo (key, window) would have interleaving byte
> > layouts like:
> >
> > AAA0001  (key AAA, timestamp 0001)
> > AAA00011(key AAA0, timestamp 0011)
> > AAA0002  (key AAA, timestamp 0002)
> >
> > which is challenging for prefix seeks to work efficiently. Although we
> can
> > overwrite the byte-comparator in JNI it is very expensive and the cost of
> > JNI overwhelms its benefits. If you've got some ideas around it please
> lmk
> > as well.
> >
> > Guozhang
> >
> >
> >
> >
> > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare  >
> > wrote:
> >
> > > Hi Sagar
> > >
> > > I implemented a very similar interface for KIP-213, the foreign-key
> > joiner.
> > > We pulled it out of the final implementation and instead used RocksDB
> > range
> > > instead. You can see the particular code where we use
> RocksDB.range(...)
> > to
> > > get the same iterator result.
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > >
> > > We pulled it out because there were numerous awkward acrobatics to
> > > integrate *prefixSeek()* function into the Kafka Streams code.
> > Basically, I
> > > wanted to be able to access *prefixSeek()* the same way I can access
> > > *range()* for any state store, and in particular use it for storing
> data
> > > with a particular foreign key (as per the previous URL). However, I
> found
> > > out that it required way too many changes to expose the *prefixSeek()*
> > > functionality while still being able to leverage all the nice Kafka
> > Streams
> > > state management + supplier functionality, so we made a decision just
> to
> > > stick with *range()* and pull everything else out.
> > >
> > > I guess my question here is, how do you anticipate using *prefixSeek()*
> > > within the framework of Kafka Streams, or the Proc

Subscribe to Kafka dev mailing list

2020-05-13 Thread 108414055
发自我的iPhone

Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread Sophie Blee-Goldman
Alright, I can get behind adding the min metric for the sake of pretty
graphs
(and trivial computation).

I'm still on the fence regarding the mean (or 50th percentile) but I can see
how users might expect it and find it a bit disorienting not to have. So the
updated proposed metrics are


   - record-staleness-max [ms]
   - record-staleness-99th [ms] *(99th percentile)*
   - record-staleness-75th [ms] *(75th percentile)*
   - record-staleness-avg [ms] *(mean)*
   - record-staleness-min [ms]


On Wed, May 13, 2020 at 4:42 PM John Roesler  wrote:

> Oh boy, I never miss an opportunity to embarrass myself. I guess the mean
> seems more interesting to me than the median, but neither are as
> interesting as the higher percentiles (99th and max).
>
> Min isn’t really important for any SLAs, but it does round out the mental
> picture of the distribution. I’ve always graphed min along with the other
> metrics to help me understand how fast the system can be, which helps in
> optimization decisions. It’s also a relatively inexpensive metric to
> compute, so it might be nice to just throw it in.
>
> On Wed, May 13, 2020, at 18:18, Sophie Blee-Goldman wrote:
> > G1:
> > I was considering it as the "end-to-end latency *up* to the specific
> task"
> > but
> > I'm happy with "record-staleness" if that drives the point home better.
> So
> > it's the
> > "staleness of the record when it is received by that task" -- will update
> > the KIP
> >
> > B1/J:
> > I'm struggling to imagine a case where the min would actually be useful,
> > rather than
> > just intellectually interesting. I don't feel strongly that we shouldn't
> > add it, but that's
> > why I didn't include it from the start. Can you enlighten me with an
> > example?
> >
> > I was also vaguely concerned about the overhead of adding multiple
> > percentile
> > metrics. Do we have any data to indicate what kind of performance hit we
> > take on
> > metrics computation?
> >
> > Also, not to be too pedantic but the 50th percentile would be the median
> > not the
> > mean. Would you propose to add the mean *and* the 50th percentile, or
> just
> > one
> > of the two?
> >
> > Thanks all!
> > Sophie
> >
> > On Wed, May 13, 2020 at 3:34 PM John Roesler 
> wrote:
> >
> > > Hello all, and thanks for the KIP, Sophie,
> > >
> > > Just some comments on the discussion so far:
> > >
> > > B2/G1:
> > > In principle, it shouldn't matter whether we report "spans" or
> > > "end-to-end" latency. But in practice, some of the spans are pretty
> > > difficult to really measure (like time spent waiting in the topics, or
> > > time from the event happening to the ETL producer choosing to send it,
> > > or time spent in send/receive buffers, etc., etc.
> > >
> > > In other words, it's practically easier to compute spans by subtracting
> > > e2e latencies than it is to compute e2e latencies by adding spans. You
> > > can even just consider that the span computation from e2e always just
> > > involves subtracting two numbers, whereas computing e2e latency from
> > > spans involves adding _all_ the spans leading up to the end you care
> about.
> > >
> > > It seems like people really prefer to have spans when they are
> debugging
> > > latency problems, whereas e2e latency is a more general measurement
> > > that basically every person/application cares about and should be
> > > monitoring.
> > >
> > > Altogether, it really seem to provide more value to more people if we
> > > report
> > > e2e latencies. Regarding "record-staleness" as a name, I think I have
> no
> > > preference, I'd defer to other peoples' intuition.
> > >
> > > G2:
> > > I think the processor-node metric is nice, since the inside of a task
> can
> > > introduce a significant amount of latency in some cases. Plus, it's a
> more
> > > direct measurement, if you really wanted to know (for the purposes of
> IQ
> > > or something) how long it takes source events to "show up" at the
> store.
> > >
> > > I think actually recording it at every processor could be expensive,
> but we
> > > already record a bunch of metrics at the node level.
> > >
> > > B1:
> > > I think 50% could be reasonable to record also. Even if it's a poor
> metric
> > > for operational purposes, a lot of people might expect to see "mean".
> > > Actually,
> > > I was surprised not to see "min". Is there a reason to leave it off?
> > >
> > > I might suggest:
> > > min, mean (50th), 75th, 99th, max
> > >
> > > B3:
> > > I agree we should include late records (though not the ones we drop).
> > > It may be spiky, but only when there are legitimately some records
> with a
> > > high end-to-end latency, which is the whole point of these metrics.
> > >
> > > That's it! I don't think I have any other feedback, other than a
> request to
> > > also report "min".
> > >
> > > Thanks,
> > > -John
> > >
> > > On Wed, May 13, 2020, at 16:58, Guozhang Wang wrote:
> > > > Thanks Sophie for the KIP, a few quick thoughts:
> > > >
> > > > 1) The end-to-end latency includes both 

[jira] [Resolved] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9465.

Resolution: Not A Problem

This was fixed implicitly via some code refactoring.

> Enclose consumer call with catching InvalidOffsetException
> --
>
> Key: KAFKA-9465
> URL: https://issues.apache.org/jira/browse/KAFKA-9465
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and 
> record handling.
> Since InvalidOffsetException is thrown by restoreConsumer.poll, we should 
> enclose this call in the try block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-615: add ConstrainedCooperativeStickyAssignor

2020-05-13 Thread Sophie Blee-Goldman
Putting this to rest as one of the shortest-lived KIPs, I've decided to
just improve the existing CooperativeStickyAssignor rather than
introduce an entire new one. Since this will not involve changing
any public APIs there's no need for a KIP anymore.

Let me know if you have any questions or concerns

On Wed, May 13, 2020 at 10:53 AM Sophie Blee-Goldman 
wrote:

> Hey all,
>
> I'd like to propose adding another OOTB cooperative assignor to better
> meet the needs of some users who don't need to full flexibility of the
> existing CooperativeStickyAssignor:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-615%3A+add+ConstrainedCooperativeStickyAssignor
>
> I included a sketch of the assignment algorithm in the KIP to give a rough
> idea of what this would look like (and that it is possible), but this is
> obviously an implementation detail and subject to change.
>
> Cheer,
> Sophie
>


[jira] [Resolved] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process

2020-05-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9850.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Move KStream#repartition operator validation during Topology build process 
> ---
>
> Key: KAFKA-9850
> URL: https://issues.apache.org/jira/browse/KAFKA-9850
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Levani Kokhreidze
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: help-wanted, newbie, newbie++
> Fix For: 2.6.0
>
>
> `KStream#repartition` operation performs most of its validation regarding 
> joining, co-partitioning, etc after starting Kafka Streams instance. Some 
> parts of this validation can be detected much earlier, specifically during 
> topology `build()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9688) kafka-topic.sh should show KIP-455 adding and removing replicas

2020-05-13 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-9688.
-
Fix Version/s: 2.5.0
 Reviewer: Colin McCabe
   Resolution: Fixed

> kafka-topic.sh should show KIP-455 adding and removing replicas
> ---
>
> Key: KAFKA-9688
> URL: https://issues.apache.org/jira/browse/KAFKA-9688
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Colin McCabe
>Assignee: Cheng Tan
>Priority: Major
> Fix For: 2.5.0
>
>
> kafka-topic.sh should show KIP-455 adding and removing replicas, as described 
> in KIP-455.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled

2020-05-13 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9991:
--

 Summary: Flaky Test 
KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
 Key: KAFKA-9991
 URL: https://issues.apache.org/jira/browse/KAFKA-9991
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/]

 
h3. Stacktrace

java.lang.AssertionError: Condition not met within timeout 3. Table did not 
read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) 
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159)
 at 
org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is back to normal : kafka-trunk-jdk8 #4526

2020-05-13 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread John Roesler
Thanks Sophie,

I hope this isn't too nit-picky, but is there a reason to choose "avg" instead
of "mean"? Maybe this is too paranoid, and I might be oversensitive because
of the mistake I just made earlier, but it strikes me that "avg" is actually
ambiguous, as it refers to a family of statistics, whereas "mean" is specific.
I see other Kafka metrics with "avg", but none with "mean"; was that the
reason? If so, I'm +1.

Regarding the names of the percentile, I actually couldn't find _any_ other
metrics that use percentile. Was there a reason to choose "99th" as opposed
to "p99" or any other scheme? This is not a criticism, I'm just primarily asking
for consistency's sake.

Thanks again,
-John

On Wed, May 13, 2020, at 19:19, Sophie Blee-Goldman wrote:
> Alright, I can get behind adding the min metric for the sake of pretty
> graphs
> (and trivial computation).
> 
> I'm still on the fence regarding the mean (or 50th percentile) but I can see
> how users might expect it and find it a bit disorienting not to have. So the
> updated proposed metrics are
> 
> 
>- record-staleness-max [ms]
>- record-staleness-99th [ms] *(99th percentile)*
>- record-staleness-75th [ms] *(75th percentile)*
>- record-staleness-avg [ms] *(mean)*
>- record-staleness-min [ms]
> 
> 
> On Wed, May 13, 2020 at 4:42 PM John Roesler  wrote:
> 
> > Oh boy, I never miss an opportunity to embarrass myself. I guess the mean
> > seems more interesting to me than the median, but neither are as
> > interesting as the higher percentiles (99th and max).
> >
> > Min isn’t really important for any SLAs, but it does round out the mental
> > picture of the distribution. I’ve always graphed min along with the other
> > metrics to help me understand how fast the system can be, which helps in
> > optimization decisions. It’s also a relatively inexpensive metric to
> > compute, so it might be nice to just throw it in.
> >
> > On Wed, May 13, 2020, at 18:18, Sophie Blee-Goldman wrote:
> > > G1:
> > > I was considering it as the "end-to-end latency *up* to the specific
> > task"
> > > but
> > > I'm happy with "record-staleness" if that drives the point home better.
> > So
> > > it's the
> > > "staleness of the record when it is received by that task" -- will update
> > > the KIP
> > >
> > > B1/J:
> > > I'm struggling to imagine a case where the min would actually be useful,
> > > rather than
> > > just intellectually interesting. I don't feel strongly that we shouldn't
> > > add it, but that's
> > > why I didn't include it from the start. Can you enlighten me with an
> > > example?
> > >
> > > I was also vaguely concerned about the overhead of adding multiple
> > > percentile
> > > metrics. Do we have any data to indicate what kind of performance hit we
> > > take on
> > > metrics computation?
> > >
> > > Also, not to be too pedantic but the 50th percentile would be the median
> > > not the
> > > mean. Would you propose to add the mean *and* the 50th percentile, or
> > just
> > > one
> > > of the two?
> > >
> > > Thanks all!
> > > Sophie
> > >
> > > On Wed, May 13, 2020 at 3:34 PM John Roesler 
> > wrote:
> > >
> > > > Hello all, and thanks for the KIP, Sophie,
> > > >
> > > > Just some comments on the discussion so far:
> > > >
> > > > B2/G1:
> > > > In principle, it shouldn't matter whether we report "spans" or
> > > > "end-to-end" latency. But in practice, some of the spans are pretty
> > > > difficult to really measure (like time spent waiting in the topics, or
> > > > time from the event happening to the ETL producer choosing to send it,
> > > > or time spent in send/receive buffers, etc., etc.
> > > >
> > > > In other words, it's practically easier to compute spans by subtracting
> > > > e2e latencies than it is to compute e2e latencies by adding spans. You
> > > > can even just consider that the span computation from e2e always just
> > > > involves subtracting two numbers, whereas computing e2e latency from
> > > > spans involves adding _all_ the spans leading up to the end you care
> > about.
> > > >
> > > > It seems like people really prefer to have spans when they are
> > debugging
> > > > latency problems, whereas e2e latency is a more general measurement
> > > > that basically every person/application cares about and should be
> > > > monitoring.
> > > >
> > > > Altogether, it really seem to provide more value to more people if we
> > > > report
> > > > e2e latencies. Regarding "record-staleness" as a name, I think I have
> > no
> > > > preference, I'd defer to other peoples' intuition.
> > > >
> > > > G2:
> > > > I think the processor-node metric is nice, since the inside of a task
> > can
> > > > introduce a significant amount of latency in some cases. Plus, it's a
> > more
> > > > direct measurement, if you really wanted to know (for the purposes of
> > IQ
> > > > or something) how long it takes source events to "show up" at the
> > store.
> > > >
> > > > I think actually recording it at every processor could be expensive,

Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-13 Thread Sophie Blee-Goldman
Yeah, the specific reason was just to align with the current metrics.

Is it better to conform than to be right? History has a lot to say on that
matter
but I'm not sure how much of it applies to the fine details of metrics
naming :P

More seriously, I figured if people are looking at this metric they're
likely to
be looking at all the others. Then naming this one "-mean" would probably
lead some to conclude that the "-avg" suffix in the other metrics has a
different meaning.

As for the percentiles, I actually like p99 (and p75) better. I'll swap
that out

On Wed, May 13, 2020 at 7:07 PM John Roesler  wrote:

> Thanks Sophie,
>
> I hope this isn't too nit-picky, but is there a reason to choose "avg"
> instead
> of "mean"? Maybe this is too paranoid, and I might be oversensitive because
> of the mistake I just made earlier, but it strikes me that "avg" is
> actually
> ambiguous, as it refers to a family of statistics, whereas "mean" is
> specific.
> I see other Kafka metrics with "avg", but none with "mean"; was that the
> reason? If so, I'm +1.
>
> Regarding the names of the percentile, I actually couldn't find _any_ other
> metrics that use percentile. Was there a reason to choose "99th" as opposed
> to "p99" or any other scheme? This is not a criticism, I'm just primarily
> asking
> for consistency's sake.
>
> Thanks again,
> -John
>
> On Wed, May 13, 2020, at 19:19, Sophie Blee-Goldman wrote:
> > Alright, I can get behind adding the min metric for the sake of pretty
> > graphs
> > (and trivial computation).
> >
> > I'm still on the fence regarding the mean (or 50th percentile) but I can
> see
> > how users might expect it and find it a bit disorienting not to have. So
> the
> > updated proposed metrics are
> >
> >
> >- record-staleness-max [ms]
> >- record-staleness-99th [ms] *(99th percentile)*
> >- record-staleness-75th [ms] *(75th percentile)*
> >- record-staleness-avg [ms] *(mean)*
> >- record-staleness-min [ms]
> >
> >
> > On Wed, May 13, 2020 at 4:42 PM John Roesler 
> wrote:
> >
> > > Oh boy, I never miss an opportunity to embarrass myself. I guess the
> mean
> > > seems more interesting to me than the median, but neither are as
> > > interesting as the higher percentiles (99th and max).
> > >
> > > Min isn’t really important for any SLAs, but it does round out the
> mental
> > > picture of the distribution. I’ve always graphed min along with the
> other
> > > metrics to help me understand how fast the system can be, which helps
> in
> > > optimization decisions. It’s also a relatively inexpensive metric to
> > > compute, so it might be nice to just throw it in.
> > >
> > > On Wed, May 13, 2020, at 18:18, Sophie Blee-Goldman wrote:
> > > > G1:
> > > > I was considering it as the "end-to-end latency *up* to the specific
> > > task"
> > > > but
> > > > I'm happy with "record-staleness" if that drives the point home
> better.
> > > So
> > > > it's the
> > > > "staleness of the record when it is received by that task" -- will
> update
> > > > the KIP
> > > >
> > > > B1/J:
> > > > I'm struggling to imagine a case where the min would actually be
> useful,
> > > > rather than
> > > > just intellectually interesting. I don't feel strongly that we
> shouldn't
> > > > add it, but that's
> > > > why I didn't include it from the start. Can you enlighten me with an
> > > > example?
> > > >
> > > > I was also vaguely concerned about the overhead of adding multiple
> > > > percentile
> > > > metrics. Do we have any data to indicate what kind of performance
> hit we
> > > > take on
> > > > metrics computation?
> > > >
> > > > Also, not to be too pedantic but the 50th percentile would be the
> median
> > > > not the
> > > > mean. Would you propose to add the mean *and* the 50th percentile, or
> > > just
> > > > one
> > > > of the two?
> > > >
> > > > Thanks all!
> > > > Sophie
> > > >
> > > > On Wed, May 13, 2020 at 3:34 PM John Roesler 
> > > wrote:
> > > >
> > > > > Hello all, and thanks for the KIP, Sophie,
> > > > >
> > > > > Just some comments on the discussion so far:
> > > > >
> > > > > B2/G1:
> > > > > In principle, it shouldn't matter whether we report "spans" or
> > > > > "end-to-end" latency. But in practice, some of the spans are pretty
> > > > > difficult to really measure (like time spent waiting in the
> topics, or
> > > > > time from the event happening to the ETL producer choosing to send
> it,
> > > > > or time spent in send/receive buffers, etc., etc.
> > > > >
> > > > > In other words, it's practically easier to compute spans by
> subtracting
> > > > > e2e latencies than it is to compute e2e latencies by adding spans.
> You
> > > > > can even just consider that the span computation from e2e always
> just
> > > > > involves subtracting two numbers, whereas computing e2e latency
> from
> > > > > spans involves adding _all_ the spans leading up to the end you
> care
> > > about.
> > > > >
> > > > > It seems like people really prefer to have spans when they are
> > > debugging

Jenkins build is back to normal : kafka-trunk-jdk11 #1452

2020-05-13 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-13 Thread Cheng Tan
Hi Rajini,

Thanks for the comments. 

> I think
> they started off as connection timeouts but now include authentication time
> as well. Have we considered using similar configs for this case?


The new config I proposed is focusing on the connections to unreachable 
servers. The timeout count won’t involved the authentication. I think “socket.” 
prefix make sense. I’ll change it. Colin mentioned that adding the key word 
“setup” can help people understand that this config only applies to the 
connection setup. What about “socket.connection.setup.ms”?

> The KIP proposes 10s as the default. What does this mean for typical
> connections like a produce request going to the leader?

The new timeout config applies to each connection. It’s at the NetworkClient 
level and won’t consider the underlying connection logic. Specifically, by 
default, every connection will have 10 seconds to become “connected” from 
“connecting”, which implies the corresponding socket channel is now connected 
(SocketChanel.finishConnect() returns True), no matter what the request logic 
and abstraction is.

Please let me know if these make sense to you or if you have more thoughts. 
Thank you.

Best, -Cheng

> 在 2020年5月13日,上午7:11,Rajini Sivaram  写道:
> 
> Hi Cheng,
> 
> Thanks for the KIP, sounds like a good improvement. A couple of comments:
> 
> 1) We currently have client connection timeouts on the broker with configs
> named `xxx.socket.timeout.ms` (e.g. controller.socket.timeout.ms). I think
> they started off as connection timeouts but now include authentication time
> as well. Have we considered using similar configs for this case? We may
> want to prefix the new config with `socket.` anyway - something along the
> lines of `socket.connection.timeout.ms` if it is just the connection time.
> 
> 2) The KIP proposes 10s as the default. What does this mean for typical
> connections like a produce request going to the leader? Instead of one
> connection attempt to the leader, we want three separate connection
> attempts within the request timeout to the leader?
> 
> Regards,
> 
> Rajini
> 
> 
>> On Thu, May 7, 2020 at 11:51 PM Jose Garcia Sancio 
>> wrote:
>> Cheng,
>> Thanks for the KIP and the detailed proposal section. LGTM!
 On Thu, May 7, 2020 at 3:38 PM Cheng Tan  wrote:
 I think more about the potential wider use cases, I modified the
>> proposal to target all the connection. Thanks.
>>> - Best, - Cheng Tan
 On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
 Hi Colin,
 Sorry for the confusion. I’m proposing to implement timeout in the
>> NetworkClient.leastLoadedNode() when iterating all the cached node. The
>> alternative I can think is to implement the timeout in NetworkClient.poll()
 I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
 Usually when clients send a request, they will asking the network
>> client to send the request to a specific node. In this case, the
>> connection.setup.timeout won’t matter too much because the client doesn’t
>> want to try other nodes for that specific request. The request level
>> timeout would be enough. The metadata fetcher fetches the nodes status
>> periodically so the clients can reassign the request to another node after
>> timeout.
 Consumer, producer, and AdminClient are all using leastLoadedNode()
>> for metadata fetch, where the connection setup timeout can play an
>> important role. Unlike other requests can refer to the metadata for node
>> condition, the metadata requests can only blindly choose a node for retry
>> in the worst scenario. We want to make sure the client can get the metadata
>> smoothly and as soon as possible. As a result, we need this
>> connection.setup.timeout.
 Implementing the timeout in poll() or anywhere else might need an
>> extra iteration of all nodes, which might downgrade the network client
>> performance.
 I also updated the KIP content and KIP status. Please let me know if
>> the above ideas make sense. Thanks.
 Best, - Cheng Tan
> On May 4, 2020, at 5:26 PM, Colin McCabe > cmcc...@apache.org>> wrote:
> Hi Cheng,
> On the KIP page, it lists this KIP as "draft."  It seems like "under
>> discussion" is appropriate here, right?
>> Currently, the initial socket connection timeout is depending on
>> Linux kernel setting
>> tcp_syn_retries. The timeout value is 2 ^ (tcp_sync_retries + 1) - 1
>> seconds. For the
>> reasons below, we want to control the client-side socket timeout
>> directly using
>> configuration files
> Linux is just one example of an OS that Kafka could run on, right?
>> You could also be running on MacOS, for example.
>> I'm proposing to do a lazy socket connection time out. That is, we
>> only check if
>> we need to timeout a socket when we consider the corresponding node
>> as a
>> candidate in the node provider.
> The NodeProvider is an AdminClient abstraction, right?  Why wouldn't
>> we implement a connection setup timeou

Re: [DISCUSS] KIP-585: Conditional SMT

2020-05-13 Thread Konstantine Karantasis
Hi Tom.

Thanks for the KIP. I like how the proposal has ended up to be and I think
it describes a practical approach.

I have to say that, for a moment, earlier in the discussion I thought we
were leaning a bit towards an unconventional mini assembly language based
on java properties.
The reliance on java properties for SMT definition and configuration has
drawn some criticism in the past, so keeping the model lean and appealing
as well as expressive is worth the effort in my opinion.

With that in mind, I feel that we could move forward without the question
mark notation. That approach would be similar to what was introduced when
SMTs were implemented by adding the `transforms` prefix. We'll have a top
level property called `predicates` and a namespace starting with the
`predicates.` prefix followed by aliases. Avoiding the '?' in the
transforms seems also ok to me in order to keep things simple and
intuitive.

Besides these suggestions, I have a few additional minor comments:

1. Should we extend the `AutoCloseable` interface instead of `Closeable`
since we ignore the Exception in the signature anyways? I know this will
make Predicate different from Transformation but it also corresponds better
to the intended use.
2. As Andrew mentioned, Predicate needs to be declared an interface in the
KIP.
3. Currently examples are neither java properties nor json. Should we
comply with one of the two formats to avoid confusing users that will read
the KIP before trying this feature?

Thanks again for driving this proposal.

Konstantine

On Mon, May 11, 2020 at 7:57 AM Tom Bentley  wrote:

> Hi Andrew,
>
> That works nicely enough for the proposal where the predicate is configured
> directly on the transformation. But I thought there was more consensus
> around the proposal to have the transformation configuration refer to a
> predicate indirectly, defined with the ?predicates key. I guess an example
> would look like this:
>
> transforms: t2
> transforms.t2?predicate: has-prefix
> transforms.t2.type: org.apache.kafka.connect.transforms.ExtractField$Key
> transforms.t2.field: c1
> ?predicates: has-prefix
> ?predicates.has-prefix.type:
> org.apache.kafka.connect.transforms.predicates.TopicNameMatch
> ?predicates.has-prefix.negate: true
> ?predicates.has-prefix.pattern: my-prefix-.*
>
> I agree that this seems to reduce the chance of conflict to practically
> nothing. I'm happy enough to go with this and I've updated the KIP
> accordingly.
>
> Assuming no one raises more concerns I'll start a vote soon.
>
> Kind regards,
>
> Tom
>
> On Mon, May 11, 2020 at 10:54 AM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi,
> > I have implemented some of this and configured some predicates and I
> > prefer this slight modification of the ? syntax:
> >
> > transforms: t2
> > transforms.t2?type:
> > org.apache.kafka.connect.transforms.predicates.TopicNameMatch
> > transforms.t2?negate: true
> > transforms.t2?pattern: my-prefix-.*
> > transforms.t2.type: org.apache.kafka.connect.transforms.ExtractField$Key
> > transforms.t2.field: c1
> >
> > So, "transforms.." introduces the config for the transformation,
> > while "transforms.?" introduces the
> > config for the predicate. The risk of a clash now is that someone has
> used
> > a ? in the alias name, which is unlikely.
> >
> > Also, there's a slight typo in the Java definition of the Predicate
> > interface. It should be:
> >
> > public interface Predicate> extends
> > Configurable, Closeable
> >
> > Looks like discussion has died down so I wonder whether it's time to
> begin
> > a vote.
> >
> > Cheers,
> > Andrew
> >
> > On 06/05/2020, 08:57, "Andrew Schofield" 
> > wrote:
> >
> > >Hi Tom,
> > >I think that either proposal is sufficiently readable and they both
> > have elements of
> > >cryptic syntax. I remember talking to an engineering director once
> > who said he didn't hire
> > >"curly braces people" any more. This is an interface for curly
> braces
> > people whichever
> > >way we go here.
> >
> > >I slightly prefer the predicates as a top-level concept rather than
> > as a feature
> > >of the If transform. I also don't like the namespacing behaviour of
> > the If transform which
> > >I guess works like variable declaration so each scope has its own
> > namespace of transform
> > >aliases. I expect the following is valid but ill-advised.
> >
> > >transforms: t1
> > >transforms.t1.type: If
> > >transforms.t1.!test.type: TopicNameMatch
> > >transforms.t1.!test.pattern: my-prefix-.*
> > >transforms.t1.then: t1
> > >transforms.t1.then.t1.type: ExtractField$Key
> > >transforms.t1.then.t1.field: c1
> >
> > >I would love to see conditional application of transforms in SMT in
> > Kafka 2.6.
> > >Let's see what other people think and get a consensus view on how to
> > spell it.
> >
> > >Cheers,
> > >Andrew
> >
> > >On 05/05/2020, 15:20, "Tom Bentley"  wro

Build failed in Jenkins: kafka-trunk-jdk8 #4527

2020-05-13 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Add a duplicate() method to Message classes (#8556)

[github] KAFKA-9850 Move KStream#repartition operator validation during Topolo…

[github] MINOR: add option to rebuild source for system tests (#6656)


--
[...truncated 6.14 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

Re: [VOTE] KIP-577: Allow HTTP Response Headers Configured for Kafka Connect

2020-05-13 Thread Konstantine Karantasis
Makes sense to allow users to comply with their requirements without taking
on the maintenance cost of keeping up with new headers across different
versions.
Thanks for the KIP Jeff.

+1 (binding)

Konstantine

On Tue, May 12, 2020 at 3:13 AM Manikumar  wrote:

> +1 (binding)
>
> Thanks for the KIP.
>
> On Wed, May 6, 2020 at 7:57 PM Randall Hauch  wrote:
>
> > Thanks for putting this together.
> >
> > +1 (binding)
> >
> > On Fri, Apr 17, 2020 at 2:02 PM Aneel Nazareth 
> wrote:
> >
> > > Thanks Jeff, this seems like it addresses a user need.
> > >
> > > +1 (non-binding)
> > >
> > > On Fri, Apr 17, 2020 at 1:28 PM Zhiguo Huang 
> > > wrote:
> > > >
> > > > Thanks to everyone for their input. I've incorporated the changes,
> and
> > I
> > > > think this is ready for voting.
> > > >
> > > > To summarize, the KIP simply proposes to add a feature which allows
> > HTTP
> > > > response headers configured for Kafka Connect.The KIP can be found
> > here:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect
> > > >
> > > > Thanks!
> > > >
> > > > Jeff.
> > >
> >
>


Re: [VOTE] KIP-437: Custom replacement for MaskField SMT

2020-05-13 Thread Konstantine Karantasis
I think this improvement makes total sense. It's interesting that it didn't
accompany the initial version of this transformation.

+1 (binding)

Konstantine

On Wed, May 6, 2020 at 2:03 PM Randall Hauch  wrote:

> Thanks for starting the vote, Yu.
>
> +1 (binding)
>
> Randall
>
> On Sat, Dec 21, 2019 at 1:22 AM Yu Watanabe  wrote:
>
> > Thank for the KIP.
> > I really want this for my project.
> >
> > +1 (non-binding)
> >
>


[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator

2020-05-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-9676.

Resolution: Fixed

The current unit test coverage is pretty good now, closing the ticket.

> Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
> --
>
> Key: KAFKA-9676
> URL: https://issues.apache.org/jira/browse/KAFKA-9676
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted, newbie
>
> The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit 
> test coverage. We should add corresponding tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)