[VOTE] KIP-791: Add Record Metadata to State Store Context

2021-11-08 Thread Patrick Stuedi
Hi all,

Thanks for the feedback on KIP-791, I have updated the KIP and would like
to start the voting.

The KIP can be found here:
https://cwiki.apache.org/confluence/x/I5BnCw

Please vote in this thread.

Thanks!
-Patrick


Re: [DISCUSS] KIP-794: Strictly Uniform Sticky Partitioner

2021-11-08 Thread Luke Chen
Thanks Artem,
It's much better now.
I've got your idea. In KIP-480: Sticky Partitioner, we'll change partition
(call partitioner) when either 1 of below condition match
1. the batch is full
2. when linger.ms is up
But, you are changing the definition, into a
"partitioner.sticky.batch.size" size is reached.

It'll fix the uneven distribution issue, because we did the sent out size
calculation in the producer side.
But it might have another issue that when the producer rate is low, there
will be some period of time the distribution is not even. Ex:
tp-1: 12KB
tp-2: 0KB
tp-3: 0KB
tp-4: 0KB
while the producer is still keeping sending records into tp-1 (because we
haven't reached the 16KB threshold)
Maybe the user should set a good value to "partitioner.sticky.batch.size"
to fix this issue?

Some comment to the KIP:
1. This paragraph is a little confusing, because there's no "batch mode" or
"non-batch mode", right?

> The batching will continue until either an in-flight batch completes or
we hit the N bytes and move to the next partition.  This way it takes just
5 records to get to batching mode, not 5 x number of partition records, and
the batching mode will stay longer as we'll be batching while waiting for a
request to be completed.

Even with linger.ms=0, before the sender thread is ready, we're always
batching (accumulating) records into batches. So I think the "batch mode"
description is confusing. And that's why I asked you if you have some kind
of "batch switch" here.

2. In motivation, you mentioned 1 drawback of current
UniformStickyPartitioner is "the sticky partitioner doesn't create batches
as efficiently", because it sent out a batch with only 1 record (under
linger.ms=0). But I can't tell how you fix this un-efficient issue in the
proposed solution. I still see we sent 1 record within 1 batch. Could you
explain more here?

Thank you.
Luke

On Sat, Nov 6, 2021 at 6:41 AM Artem Livshits
 wrote:

> Hi Luke,
>
> Thank you for your feedback.  I've updated the KIP with your suggestions.
>
> 1. Updated with a better example.
> 2. I removed the reference to ClassicDefaultPartitioner, it was probably
> confusing.
> 3. The logic doesn't rely on checking batches, I've updated the proposal to
> make it more explicit.
> 4. The primary issue (uneven distribution) is described in the linked jira,
> copied an example from jira into the KIP as well.
>
> -Artem
>
>
> On Thu, Nov 4, 2021 at 8:34 PM Luke Chen  wrote:
>
> > Hi Artem,
> > Thanks for the KIP! And thanks for reminding me to complete KIP-782,
> soon.
> > :)
> >
> > Back to the KIP, I have some comments:
> > 1. You proposed to have a new config: "partitioner.sticky.batch.size",
> but
> > I can't see how we're going to use it to make the partitioner better.
> > Please explain more in KIP (with an example will be better as suggestion
> > (4))
> > 2. In the "Proposed change" section, you take an example to use
> > "ClassicDefaultPartitioner", is that referring to the current default
> > sticky partitioner? I think it'd better you name your proposed partition
> > with a different name for distinguish between the default one and new
> one.
> > (Although after implementation, we are going to just use the same name)
> > 3. So, if my understanding is correct, you're going to have a "batch"
> > switch, and before the in-flight is full, it's disabled. Otherwise, we'll
> > enable it. Is that right? Sorry, I don't see any advantage of having this
> > batch switch. Could you explain more?
> > 4. I think it should be more clear if you can have a clear real example
> in
> > the motivation section, to describe what issue we faced using current
> > sticky partitioner. And in proposed changes section, using the same
> > example, to describe more detail about how you fix this issue with your
> > way.
> >
> > Thank you.
> > Luke
> >
> > On Fri, Nov 5, 2021 at 1:38 AM Artem Livshits
> >  wrote:
> >
> > > Hello,
> > >
> > > This is the discussion thread for
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > .
> > >
> > > The proposal is a bug fix for
> > > https://issues.apache.org/jira/browse/KAFKA-10888, but it does
> include a
> > > client config change, therefore we have a KIP to discuss.
> > >
> > > -Artem
> > >
> >
>


Re: [VOTE] KIP-791: Add Record Metadata to State Store Context

2021-11-08 Thread Luke Chen
Hi Patrick,
Thanks for the KIP.
Adding RecordMetadata into StateStoreContext for offset updating makes
sense to me.

+1 (non-binding)

Thank you.
Luke


On Mon, Nov 8, 2021 at 5:18 PM Patrick Stuedi 
wrote:

> Hi all,
>
> Thanks for the feedback on KIP-791, I have updated the KIP and would like
> to start the voting.
>
> The KIP can be found here:
> https://cwiki.apache.org/confluence/x/I5BnCw
>
> Please vote in this thread.
>
> Thanks!
> -Patrick
>


Re: [DISCUSS] KIP-719: Add Log4J2 Appender

2021-11-08 Thread Mickael Maison
Hi Dongjin,

Thanks for working on the update to log4j2, it's definitively
something we should complete.
I have a couple of comments:

1) Is the KIP proposing to replace the existing log4-appender or
simply add a new one for log4j2? Reading the KIP and with its current
title, it's not entirely explicit. For example I don't see a statement
under the proposed changes section. The PR seems to only add a new
appender but the KIP mentions we want to fully remove dependencies to
log4j.

2) Under Rejected Alternative, the KIP states: "the Kafka appender
provided by log4j2 community stores log message in the Record key".
Looking at the code, it looks like the log message is stored in the
Record value: 
https://github.com/apache/logging-log4j2/blob/master/log4j-kafka/src/main/java/org/apache/logging/log4j/kafka/appender/KafkaManager.java#L135
Am I missing something?
Comparing it with the proposed new appender, apart from their
configuration format (hence the backwards compatibility issues), they
both work pretty much the same way, so it's not clear it would add a
ton a value.

At a glance, _I've not extensively looked at it_, it does not look
very hard to migrate to the appender from the logging team. I was
wondering if we should mention it in our documentation but I was not
able to find any references to the log4j-appender in the Kafka docs:
https://github.com/apache/kafka-site/search?q=KafkaLog4jAppender

As the current log4j-appender is not even deprecated yet, in theory we
can't remove it till Kafka 4. If we want to speed up the process, I
wonder if the lack of documentation and a migration guide could help
us. What do you think?

Thanks,
Mickael




On Fri, Jun 11, 2021 at 4:57 PM Boojapho O  wrote:
>
> Continuing to use log4j would leave several known security vulnerabilities in 
> Apache Kafka, including https://nvd.nist.gov/vuln/detail/CVE-2019-17571.  The 
> Apache log4j team will not fix this vulnerability and is urging an upgrade to 
> log4j2.  See https://logging.apache.org/log4j/1.2/ for further information.
>
> This is desperately needed in Apache 3.0 to keep the software secure.
>
> On 2021/05/26 12:31:20, Dongjin Lee  wrote:
> > CC'd the +1ers of KIP-653 with detailed context:
> >
> > When I submitted and got the approval of KIP-653: Upgrade log4j to log4j2
> > ,
> > I thought the log4j2-appender should not be the scope of the work. But it
> > was wrong.
> >
> > Since the VerifiableLog4jAppender tool is built upon log4j-appender, log4j
> > 1.x artifact will co-exist with log4j2 artifact in the classpath within
> > this scheme. Since the log4j 1.x code is not called anymore, I thought it
> > is not problematic but actually, it was not - when I started to provide a
> > preview of KIP-653
> > , some
> > users reported that sometimes slf4j fails to find the appropriate binding
> > within the classpath, resulting fail to append the log message.
> >
> > To resolve this problem, I subtly adjusted the scope of the work; I
> > excluded Tools and Trogdor from KIP-653 and extended KIP-719 to take care
> > of them instead, along with providing log4j2-appender. It is why the
> > current WIP implementations include some classpath logic in the shell
> > script and *why KIP-653 only can't complete the log4j2 migration*.
> >
> > I hope you will check this proposal out.
> >
> > Best,
> > Dongjin
> >
> > On Tue, May 25, 2021 at 10:43 PM Dongjin Lee  wrote:
> >
> > > Bumping up the discussion thread.
> > >
> > > Recently, I updated the document of KIP-653: Upgrade log4j to log4j2
> > > 
> > >  (accepted)
> > > and KIP-719: Add Log4J2 Appender
> > > 
> > >  (under
> > > discussion) reflecting the recent changes to our codebase. Especially:
> > >
> > > 1. KIP-653 document
> > > 
> > >  now
> > > explains which modules will be migrated and why.
> > > 2. KIP-719 document
> > > 
> > >  now
> > > explains not only the log4j2-appender plan but also upgrading the omitted
> > > modules in KIP-653 into log4j2.
> > >
> > > As you can see here, those two KIPs are the different parts of the same
> > > problem. I believe the community will have a good grasp on why both KIPs
> > > are best if released altogether.
> > >
> > > I will open the voting thread now, and please leave a vote if you are
> > > interested in this issue.
> > >
> > > Best,
> > > Dongjin
> > >
> > > On Tue, Mar 2, 2021 at 5:00 PM Dongjin Lee  wrote:
> > >
> > >> Hi Kafka dev,
> > >>
> > >> I would like to start the discussion of KIP-719: Add Log4J2 Appender.
> > >>
> > >>
> > >> https:

Re: [VOTE] KIP-791: Add Record Metadata to State Store Context

2021-11-08 Thread Vasiliki Papavasileiou
Hi Patrick,

Having the recordMetadata available in the state stores is fundamental for
the consistency work and the proposed approach is reasonable.

+1 (non-binding)

Thank you,
Vicky

On Mon, Nov 8, 2021 at 10:00 AM Luke Chen  wrote:

> Hi Patrick,
> Thanks for the KIP.
> Adding RecordMetadata into StateStoreContext for offset updating makes
> sense to me.
>
> +1 (non-binding)
>
> Thank you.
> Luke
>
>
> On Mon, Nov 8, 2021 at 5:18 PM Patrick Stuedi  >
> wrote:
>
> > Hi all,
> >
> > Thanks for the feedback on KIP-791, I have updated the KIP and would like
> > to start the voting.
> >
> > The KIP can be found here:
> > https://cwiki.apache.org/confluence/x/I5BnCw
> >
> > Please vote in this thread.
> >
> > Thanks!
> > -Patrick
> >
>


[DISCUSS] KIP-784: Add public APIs for AbstractCoordinator

2021-11-08 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi Kafka devs,

I would like to start the discussion of  KIP-784: Add public APIs for 
AbstractCoordinator

https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+public+APIs+for+AbstractCoordinator

Looking forward for some feedback from the community.

Regards,
Hector



DISMISS - Re:[DISCUSS] KIP-784: Add public APIs for AbstractCoordinator

2021-11-08 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Please dismiss this message, as the KIP number is wrong.

I'll send a new message with the correct KIP shortly. Apologies

From: dev@kafka.apache.org At: 11/08/21 08:45:22 UTC-5:00To:  
dev@kafka.apache.org
Subject: [DISCUSS] KIP-784: Add public APIs for AbstractCoordinator

Hi Kafka devs,

I would like to start the discussion of  KIP-784: Add public APIs for 
AbstractCoordinator

https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+public+APIs+for
+AbstractCoordinator

Looking forward for some feedback from the community.

Regards,
Hector




[DISCUSS] KIP-795: Add public APIs for AbstractCoordinator

2021-11-08 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hi Kafka devs,

I would like to start the discussion of  KIP-795: Add public APIs for 
AbstractCoordinator

https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator

Looking forward for some feedback from the community.

Regards,
Hector

[VOTE] KIP-779: Allow Source Tasks to Handle Producer Exceptions

2021-11-08 Thread Knowles Atchison Jr
Good morning,

I'd like to start a vote for KIP-779: Allow Source Tasks to Handle Producer
Exceptions:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-779%3A+Allow+Source+Tasks+to+Handle+Producer+Exceptions

The purpose of this KIP is to allow Source Tasks the option to "ignore"
kafka producer exceptions. After a few iterations, this is now part of the
"errors.tolerance" configuration and provides a null RecordMetadata to
commitRecord() in lieu of a new SourceTask interface method or worker
configuration item.

PR is here:

https://github.com/apache/kafka/pull/11382

Any comments and feedback are welcome.

Knowles


Re: Automation Script : Kafka topic creation

2021-11-08 Thread Kafka Life
Thank you Men and Ran



On Sat, Nov 6, 2021 at 7:23 PM Men Lim  wrote:

> I'm currently using Kafka-gitops.
>
> On Sat, Nov 6, 2021 at 3:35 AM Kafka Life  wrote:
>
> > Dear Kafka experts
> >
> > does anyone have ready /automated script to create /delete /alter topics
> in
> > different environments?
> > taking Configuration parameter as input .
> >
> > if yes i request you to kindly share it with me .. please
> >
>


Re: [DISCUSS] KIP-794: Strictly Uniform Sticky Partitioner

2021-11-08 Thread Justine Olshan
Hi Artem,
Thanks for working on improving the Sticky Partitioner!

I had a few questions about this portion:

*The batching will continue until either an in-flight batch completes or we
hit the N bytes and move to the next partition.  This way it takes just 5
records to get to batching mode, not 5 x number of partition records, and
the batching mode will stay longer as we'll be batching while waiting for a
request to be completed.  As the production rate accelerates, the logic
will automatically switch to use larger batches to sustain higher
throughput.*

*If one of the brokers has higher latency the records for the partitions
hosted on that broker are going to form larger batches, but it's still
going to be the same *amount records* sent less frequently in larger
batches, the logic automatically adapts to that.*

I was curious about how the logic automatically switches here. It seems
like we are just adding *partitioner.sticky.batch.size *which seems like a
static value. Can you go into more detail about this logic? Or clarify
something I may have missed.

On Mon, Nov 8, 2021 at 1:34 AM Luke Chen  wrote:

> Thanks Artem,
> It's much better now.
> I've got your idea. In KIP-480: Sticky Partitioner, we'll change partition
> (call partitioner) when either 1 of below condition match
> 1. the batch is full
> 2. when linger.ms is up
> But, you are changing the definition, into a
> "partitioner.sticky.batch.size" size is reached.
>
> It'll fix the uneven distribution issue, because we did the sent out size
> calculation in the producer side.
> But it might have another issue that when the producer rate is low, there
> will be some period of time the distribution is not even. Ex:
> tp-1: 12KB
> tp-2: 0KB
> tp-3: 0KB
> tp-4: 0KB
> while the producer is still keeping sending records into tp-1 (because we
> haven't reached the 16KB threshold)
> Maybe the user should set a good value to "partitioner.sticky.batch.size"
> to fix this issue?
>
> Some comment to the KIP:
> 1. This paragraph is a little confusing, because there's no "batch mode" or
> "non-batch mode", right?
>
> > The batching will continue until either an in-flight batch completes or
> we hit the N bytes and move to the next partition.  This way it takes just
> 5 records to get to batching mode, not 5 x number of partition records, and
> the batching mode will stay longer as we'll be batching while waiting for a
> request to be completed.
>
> Even with linger.ms=0, before the sender thread is ready, we're always
> batching (accumulating) records into batches. So I think the "batch mode"
> description is confusing. And that's why I asked you if you have some kind
> of "batch switch" here.
>
> 2. In motivation, you mentioned 1 drawback of current
> UniformStickyPartitioner is "the sticky partitioner doesn't create batches
> as efficiently", because it sent out a batch with only 1 record (under
> linger.ms=0). But I can't tell how you fix this un-efficient issue in the
> proposed solution. I still see we sent 1 record within 1 batch. Could you
> explain more here?
>
> Thank you.
> Luke
>
> On Sat, Nov 6, 2021 at 6:41 AM Artem Livshits
>  wrote:
>
> > Hi Luke,
> >
> > Thank you for your feedback.  I've updated the KIP with your suggestions.
> >
> > 1. Updated with a better example.
> > 2. I removed the reference to ClassicDefaultPartitioner, it was probably
> > confusing.
> > 3. The logic doesn't rely on checking batches, I've updated the proposal
> to
> > make it more explicit.
> > 4. The primary issue (uneven distribution) is described in the linked
> jira,
> > copied an example from jira into the KIP as well.
> >
> > -Artem
> >
> >
> > On Thu, Nov 4, 2021 at 8:34 PM Luke Chen  wrote:
> >
> > > Hi Artem,
> > > Thanks for the KIP! And thanks for reminding me to complete KIP-782,
> > soon.
> > > :)
> > >
> > > Back to the KIP, I have some comments:
> > > 1. You proposed to have a new config: "partitioner.sticky.batch.size",
> > but
> > > I can't see how we're going to use it to make the partitioner better.
> > > Please explain more in KIP (with an example will be better as
> suggestion
> > > (4))
> > > 2. In the "Proposed change" section, you take an example to use
> > > "ClassicDefaultPartitioner", is that referring to the current default
> > > sticky partitioner? I think it'd better you name your proposed
> partition
> > > with a different name for distinguish between the default one and new
> > one.
> > > (Although after implementation, we are going to just use the same name)
> > > 3. So, if my understanding is correct, you're going to have a "batch"
> > > switch, and before the in-flight is full, it's disabled. Otherwise,
> we'll
> > > enable it. Is that right? Sorry, I don't see any advantage of having
> this
> > > batch switch. Could you explain more?
> > > 4. I think it should be more clear if you can have a clear real example
> > in
> > > the motivation section, to describe what issue we faced using current
> > > sticky partitioner. And in proposed c

Kafka : High avaialability settings

2021-11-08 Thread Kafka Life
Dear Kafka experts

i have a 10 broker kafka cluster with all topics having replication factor
as 3 and partition 50

min.in.synch replicas is 2.


One broker went down for a hardware failure, but many applications
complained they are not able to produce /consume messages.

I request you to please suggest, how do i overcome this problem and make
kafka high available even during broker being down or during rolling
restarts.

IS there a configuration at a topic level i can set it up to have new
partition created in active and running brokers when a node is down ?
i read through ack -0/1/all to be set at application /producer end .But
applications are not ready to change ack all .
Can you please suggest .

many thanks in advance


Re: [VOTE] KIP-791: Add Record Metadata to State Store Context

2021-11-08 Thread Guozhang Wang
+1, thanks Patrick!


Guozhang

On Mon, Nov 8, 2021 at 5:44 AM Vasiliki Papavasileiou
 wrote:

> Hi Patrick,
>
> Having the recordMetadata available in the state stores is fundamental for
> the consistency work and the proposed approach is reasonable.
>
> +1 (non-binding)
>
> Thank you,
> Vicky
>
> On Mon, Nov 8, 2021 at 10:00 AM Luke Chen  wrote:
>
> > Hi Patrick,
> > Thanks for the KIP.
> > Adding RecordMetadata into StateStoreContext for offset updating makes
> > sense to me.
> >
> > +1 (non-binding)
> >
> > Thank you.
> > Luke
> >
> >
> > On Mon, Nov 8, 2021 at 5:18 PM Patrick Stuedi
>  > >
> > wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the feedback on KIP-791, I have updated the KIP and would
> like
> > > to start the voting.
> > >
> > > The KIP can be found here:
> > > https://cwiki.apache.org/confluence/x/I5BnCw
> > >
> > > Please vote in this thread.
> > >
> > > Thanks!
> > > -Patrick
> > >
> >
>


-- 
-- Guozhang


Re: Kafka : High avaialability settings

2021-11-08 Thread sunil chaudhari
Hi,
You can try reducing min.insynch replicas to 1



On Tue, 9 Nov 2021 at 1:56 AM, Kafka Life  wrote:

> Dear Kafka experts
>
> i have a 10 broker kafka cluster with all topics having replication factor
> as 3 and partition 50
>
> min.in.synch replicas is 2.
>
>
> One broker went down for a hardware failure, but many applications
> complained they are not able to produce /consume messages.
>
> I request you to please suggest, how do i overcome this problem and make
> kafka high available even during broker being down or during rolling
> restarts.
>
> IS there a configuration at a topic level i can set it up to have new
> partition created in active and running brokers when a node is down ?
> i read through ack -0/1/all to be set at application /producer end .But
> applications are not ready to change ack all .
> Can you please suggest .
>
> many thanks in advance
>


Re: [DISCUSS] KIP-794: Strictly Uniform Sticky Partitioner

2021-11-08 Thread Artem Livshits
Hi Luke, Justine,

Thank you for feedback and questions. I've added clarification to the KIP.

> there will be some period of time the distribution is not even.

That's correct.  There would be a small temporary imbalance, but over time
the distribution should be uniform.

> 1. This paragraph is a little confusing, because there's no "batch mode"
or "non-batch mode", right?

Updated the wording to not use "batch mode"

> 2. In motivation, you mentioned 1 drawback of current
UniformStickyPartitioner is

The problem with the current implementation is that it switches once a new
batch is created which may happen after the first record when linger.ms=0.
The new implementation won't switch after the batch, so even if the first
record got sent out in a batch, the second record would be produced to the
same partition.  Once we have 5 batches in-flight, the new records will
pile up in the accumulator.

> I was curious about how the logic automatically switches here.

Added some clarifications to the KIP.  Basically, because we can only have
5 in-flight batches, as soon as the first 5 are in-flight, the records
start piling up in the accumulator.  If the rate is low, records get sent
quickly (e.g. if we have latency 50ms, and produce less than 20 rec / sec,
then each record will often get sent in its own batch, because a batch
would often complete before a new record arrives).  If the rate is high,
then the first few records get sent quickly, but then records will batch
together until one of the in-flight batches completes, the higher the rate
is (or the higher latency is), the larger the batches are.

This is not a new logic, btw, this is how it works now, the new logic just
helps to utilize this better by giving the partition an opportunity to hit
5 in-flight and start accumulating sooner.  KIP-782 will make this even
better, so batches could also grow beyond 16KB if production rate is high.

-Artem


On Mon, Nov 8, 2021 at 11:56 AM Justine Olshan 
wrote:

> Hi Artem,
> Thanks for working on improving the Sticky Partitioner!
>
> I had a few questions about this portion:
>
> *The batching will continue until either an in-flight batch completes or we
> hit the N bytes and move to the next partition.  This way it takes just 5
> records to get to batching mode, not 5 x number of partition records, and
> the batching mode will stay longer as we'll be batching while waiting for a
> request to be completed.  As the production rate accelerates, the logic
> will automatically switch to use larger batches to sustain higher
> throughput.*
>
> *If one of the brokers has higher latency the records for the partitions
> hosted on that broker are going to form larger batches, but it's still
> going to be the same *amount records* sent less frequently in larger
> batches, the logic automatically adapts to that.*
>
> I was curious about how the logic automatically switches here. It seems
> like we are just adding *partitioner.sticky.batch.size *which seems like a
> static value. Can you go into more detail about this logic? Or clarify
> something I may have missed.
>
> On Mon, Nov 8, 2021 at 1:34 AM Luke Chen  wrote:
>
> > Thanks Artem,
> > It's much better now.
> > I've got your idea. In KIP-480: Sticky Partitioner, we'll change
> partition
> > (call partitioner) when either 1 of below condition match
> > 1. the batch is full
> > 2. when linger.ms is up
> > But, you are changing the definition, into a
> > "partitioner.sticky.batch.size" size is reached.
> >
> > It'll fix the uneven distribution issue, because we did the sent out size
> > calculation in the producer side.
> > But it might have another issue that when the producer rate is low, there
> > will be some period of time the distribution is not even. Ex:
> > tp-1: 12KB
> > tp-2: 0KB
> > tp-3: 0KB
> > tp-4: 0KB
> > while the producer is still keeping sending records into tp-1 (because we
> > haven't reached the 16KB threshold)
> > Maybe the user should set a good value to "partitioner.sticky.batch.size"
> > to fix this issue?
> >
> > Some comment to the KIP:
> > 1. This paragraph is a little confusing, because there's no "batch mode"
> or
> > "non-batch mode", right?
> >
> > > The batching will continue until either an in-flight batch completes or
> > we hit the N bytes and move to the next partition.  This way it takes
> just
> > 5 records to get to batching mode, not 5 x number of partition records,
> and
> > the batching mode will stay longer as we'll be batching while waiting
> for a
> > request to be completed.
> >
> > Even with linger.ms=0, before the sender thread is ready, we're always
> > batching (accumulating) records into batches. So I think the "batch mode"
> > description is confusing. And that's why I asked you if you have some
> kind
> > of "batch switch" here.
> >
> > 2. In motivation, you mentioned 1 drawback of current
> > UniformStickyPartitioner is "the sticky partitioner doesn't create
> batches
> > as efficiently", because it sent out a batch with on

Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-08 Thread Guozhang Wang
Hello David,

Thanks for the very nice writeup! It helped me a lot to refresh my memory
on KIP-630/590/584 :)

I just had two clarification questions after reading through the KIP:

1. For the initialization procedure, do we guarantee that all the quorum
nodes (inactive candidates and leaders, a.k.a. controllers) would always
initialize with the same metadata.version? If yes, how is that guaranteed?
More specifically, when a quorum candidate is starting up, would it avoid
handling any controller requests (including APIVersionsRequest) from its
peers in the quorum until it completes reading the local log? And even if
yes, what would happen if there's no FeatureLevelRecord found, and
different nodes read different values from their local meta.properties file
or initializing from their binary's hard-code values?

2. This is not only for the metadata.version itself, but for general
feature.versions: when a version is upgraded / downgraded, since brokers
would read the FeatureLevelRecord at their own pace, there will always be a
race window when some brokers has processed the record and completed the
upgrade while others have not, hence may behave differently --- I'm
thinking for the future like the specific replica selector to allow
fetching from follower, and even more advanced selectors --- i.e. should we
consider letting clients to only talk to brokers with the highest metadata
log offsets for example?


Guozhang




On Fri, Nov 5, 2021 at 3:18 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the reply.
>
> 16. My first concern is that the KIP picks up meta.version inconsistently
> during the deployment. If a new cluster is started, we pick up the highest
> version. If we upgrade, we leave the feature version unchanged.
> Intuitively, it seems that independent of how a cluster is deployed, we
> should always pick the same feature version. I think we need to think
> this through in this KIP. My second concern is that as a particular version
> matures, it's inconvenient for a user to manually upgrade every feature
> version. As long as we have a path to achieve that in the future, we don't
> need to address that in this KIP.
>
> 21. "./kafka-features.sh delete": Deleting a feature seems a bit weird
> since the logic is always there. Would it be better to use disable?
>
> Jun
>
> On Fri, Nov 5, 2021 at 8:11 AM David Arthur
>  wrote:
>
> > Colin and Jun, thanks for the additional comments!
> >
> > Colin:
> >
> > > We've been talking about having an automated RPC compatibility checker
> >
> > Do we have a way to mark fields in schemas as deprecated? It can stay in
> > the RPC, it just complicates the logic a bit.
> >
> > > It would be nice if the active controller could validate that a
> majority
> > of the quorum could use the proposed metadata.version. The active
> > controller should have this information, right? If we don't have recent
> > information  from a quorum of voters, we wouldn't be active.
> >
> > I believe we should have this information from the ApiVersionsResponse.
> It
> > would be good to do this validation to avoid a situation where a
> > quorum leader can't be elected due to unprocessable records.
> >
> > > Do we need delete as a command separate from downgrade?
> >
> > I think from an operator's perspective, it is nice to distinguish between
> > changing a feature flag and unsetting it. It might be surprising to an
> > operator to see the flag's version set to nothing when they requested the
> > downgrade to version 0 (or less).
> >
> > > it seems like we should spell out that metadata.version begins at 1 in
> > KRaft clusters
> >
> > I added this text:
> >
> > Introduce an IBP version to indicate the lowest software version that
> > > supports *metadata.version*. Below this IBP, the *metadata.version* is
> > > undefined and will not be examined. At or above this IBP, the
> > > *metadata.version* must be *0* for ZooKeeper clusters and will be
> > > initialized as *1* for KRaft clusters.
> >
> >
> > > We probably also want an RPC implemented by both brokers and
> controllers
> > that will reveal the min and max supported versions for each feature
> level
> > supported by the server
> >
> > This is available in ApiVersionsResponse (we include the server's
> supported
> > features as well as the cluster's finalized features)
> >
> > 
> >
> > Jun:
> >
> > 12. I've updated the KIP with AdminClient changes
> >
> > 14. You're right, it looks like I missed a few sections regarding
> snapshot
> > generation. I've corrected it
> >
> > 16. This feels more like an enhancement to KIP-584. I agree it could be
> > useful, but perhaps we could address it separately from KRaft upgrades?
> >
> > 20. Indeed snapshots are not strictly necessary during an upgrade, I've
> > reworded this
> >
> >
> > Thanks!
> > David
> >
> >
> > On Thu, Nov 4, 2021 at 6:51 PM Jun Rao  wrote:
> >
> > > Hi, David, Jose and Colin,
> > >
> > > Thanks for the reply. A few more comments.
> > >
> > > 12. It seems that we haven't

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #555

2021-11-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 5187186 lines...]
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testDescribeWhenTopicDoesntExistWithIfExists() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testCreateWithDefaultPartitions() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testNonBlockingProducer() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testCreateWithDefaultPartitions() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > testListTopics() 
STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testNonBlockingProducer() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > testAutoCreateTopic() 
STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > testListTopics() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testDeleteWhenTopicDoesntExistWithIfExists() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > testAutoCreateTopic() 
PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testSendRecordBatchWithMaxRequestSizeAndHigher() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] > Task :streams:integrationTest
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldProcessDataFromStoresWithLoggingDisabled PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreStateFromSourceTopic STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreStateFromSourceTopic PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldSuccessfullyStartWhenLoggingDisabled PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreStateFromChangelogTopic STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RestoreIntegrationTest > 
shouldRestoreStateFromChangelogTopic PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[at_least_once] 
PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.RocksDBMetricsIntegrationTest > 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir[exactly_once] 
STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] 
org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > 
testReprocessingFromScratchAfterResetWithIntermediateInternalTopic STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] > Task :core:integrationTest
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testSendRecordBatchWithMaxRequestSizeAndHigher() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testSendWithInvalidCreateTime() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testDeleteWhenTopicDoesntExistWithIfExists() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testCreateWhenAlreadyExistsWithIfNotExists() STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > 
testSendWithInvalidCreateTime() PASSED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] PlaintextProducerSendTest > testBatchSizeZero() 
STARTED
[2021-11-09T04:49:19.578Z] 
[2021-11-09T04:49:19.578Z] TopicCommandIntegrationTest > 
testCrea

[jira] [Created] (KAFKA-13437) Broker parameter optimization: security.inter.broker.protocol and num.network.threads

2021-11-08 Thread RivenSun (Jira)
RivenSun created KAFKA-13437:


 Summary: Broker parameter optimization: 
security.inter.broker.protocol and num.network.threads
 Key: KAFKA-13437
 URL: https://issues.apache.org/jira/browse/KAFKA-13437
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: RivenSun


h1. 1. security.inter.broker.protocol

Firstly see this parameter comment
{code:java}
security.inter.broker.protocolSecurity protocol used to communicate between 
brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an 
error to set this and inter.broker.listener.name properties at the same time. 
{code}
We will not know from the comments, after using this configuration, the final 
value of InterBrokerListenerName is the same as the value of 
security.inter.broker.protocol. I originally thought it would find a suitable 
listenerName from the listener.security.protocol.map configuration.

The result is: broker startup failed

 
{code:java}
[2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception 
(kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: 
inter.broker.listener.name must be a listener name defined in 
advertised.listeners. The valid options based on currently configured listeners 
are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
        at scala.Predef$.require(Predef.scala:337)
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
        at kafka.server.KafkaConfig.(KafkaConfig.scala:1897)
        at kafka.server.KafkaConfig.(KafkaConfig.scala:1394)
        at kafka.Kafka$.buildServer(Kafka.scala:67)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)
 {code}
 

 
h1. 2. num.network.threads

The networkThreads corresponding to this parameter are not shared by all 
listeners, but each listener will create the same number of networkProcessors, 
which causes the Kafka process to open too many unnecessary threads, which 
leads to a waste of resources.
for example:
listenerNameA: used for communication between brokers
listenerNameB: used to connect production messages and fetch messages on the 
client side
listenerNameC: Used by Kafka operation and maintenance personnel to manage the 
cluster and issue control type requests, such as deleting topics or adding 
partitions, etc.


So as expected, the num.network.threads of listenerNameB should be increased, 
and the networkThreads of the other two listeners can be appropriately reduced

 
h1. Rootcause:

1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 

KafkaConfig.scala
{code:java}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
  Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
case Some(_) if 
originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
  throw new ConfigException(s"Only one of 
${KafkaConfig.InterBrokerListenerNameProp} and " +
s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
case Some(name) =>
  val listenerName = ListenerName.normalised(name)
  val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s"Listener with name ${listenerName.value} 
defined in " +
  s"${KafkaConfig.InterBrokerListenerNameProp} not found in 
${KafkaConfig.ListenerSecurityProtocolMapProp}."))
  (listenerName, securityProtocol)
case None =>
  val securityProtocol = 
getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
KafkaConfig.InterBrokerSecurityProtocolProp)
  (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
  }
} {code}
ListenerName.forSecurityProtocol(securityProtocol) limits the value of 
InterBrokerListenerName to the value of securityProtocol.name

2. See "addDataPlaneProcessors" method in SocketServer.scala

In this method, processors of the size of newProcessorsPerListener are created 
for each EndPoint, the value of newProcessorsPerListener is 
config.numNetworkThreads

 
h1. Suggestion
 # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
Use listenerSecurityProtocolMap to find a suitable listenerName for 
security.inter.broker.protocol.
If there are multiple keys in the listenerSecurityProtocolMap with 
"mapKey.value='value of security.inter.broker.protocol'", the listenerName 
corresponding to the first key is returned.
 # The number of network threads can be configured separately for each 
listenerName, refer to the definition of the parameter 
sasl.server.callback.handler.class,
num.network.threads is used as the default value
*listener.name.\{listenerName}.num.network.threads* is used as the private 
configuration of each listener.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)