[jira] [Created] (KAFKA-13487) Create a topic partition directory based on the size of the directory

2021-11-29 Thread Gongwenzhou (Jira)
Gongwenzhou created KAFKA-13487:
---

 Summary: Create a topic partition directory based on the size of 
the directory
 Key: KAFKA-13487
 URL: https://issues.apache.org/jira/browse/KAFKA-13487
 Project: Kafka
  Issue Type: Sub-task
  Components: log
Reporter: Gongwenzhou


In the current multi-directory scenario of `log.dirs`, logManager determines 
which base directory the new partition directory should be in according to the 
number of subdirectories in each base directory. However, in the actual 
production environment, the file size generated by each subject partition is 
different. As a result, the disk usage of the base directory is unbalanced.
We can provide a configuration to determine the strategy of logManager to 
create a new partition directory, as follows:
Broker-level configuration items:{*}log.directory.select.strategy{*}, there are 
two values: *partition* and *size*
*partition:* Sort by the number of directories under each base directory 
(log.dirs) (the current version of the new partition creation strategy)
*size:* Sort by the size of each directory (log.dirs), the smallest directory 
has the highest allocation right



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: KIP-769: Connect API to retrieve connector configuration definitions

2021-11-29 Thread Mickael Maison
Hi Chris,

Yes to keep compatibility we want a default implementation for
Converter.configs(), I've updated the KIP.

Regarding worker plugins, the use case you described seems valuable.
I'd prefer not mixing worker and connector plugins on the same
endpoint but I agree using /plugins and /worker-plugins could be
confusing.

One alternative is to expose all connector-level plugins via the
existing /connector-plugins endpoint. In that case, we'd need to keep
the current JSON schema and not group plugins by type. As the current
schema already has a type field for each entry, we'll still be able to
tell them apart. Then we can have /worker-plugins and a relatively
clean API. What do you think?

Thanks,
Mickael

On Sun, Nov 28, 2021 at 8:21 PM Chris Egerton
 wrote:
>
> Hi Mickael,
>
> I think one potential use case for exposing worker-level plugins is that it
> may make it easier to determine whether a worker is set up correctly (the
> current alternative requires looking through log files and can be a little
> tedious), and might even make it possible to automatically identify
> discrepancies within a cluster by diffing the contents of that endpoint
> across each worker. But I don't think this has to be addressed by the
> current KIP; the only thing that bothers me a little is that "plugins" is
> generic and it may confuse people down the road if we add an endpoint for
> worker-level plugins ("why is one just called 'plugins' and the other one
> is 'worker-plugins'?"). Probably not worth blocking on, though.
>
> Agreed that the suggestion for improved validation should be made on the
> KIP-802 thread.
>
> I also noticed that the newly-proposed config method for the Converter
> interface doesn't have a default implementation, making it
> backwards-incompatible. Should we add a default implementation that returns
> either null or an empty ConfigDef?
>
> Cheers,
>
> Chris
>
> On Fri, Nov 26, 2021 at 8:35 AM Mickael Maison 
> wrote:
>
> > Hi Chris,
> >
> > 1. If we want to expose worker plugins, I think we should do it via a
> > separate endpoint. But to be honest, I'm not even sure I see strong
> > use cases for exposing them as they are either enabled or not and
> > can't be changed at runtime. So I'd prefer to stick to "connector
> > level" plugins in this KIP. Let me now if you have use cases, I'm open
> > to reconsider this choice.
> > I'll add that in the rejected alternatives section for now
> >
> > 2. I remembered seeing issues in the past with multiple plugin.path
> > entries but I tried today and I was able to mix and match plugins from
> > different paths. So my bad for getting confused.
> > Then I agree, it makes more sense to group them by plugin type.
> >
> > 3. Yes this should be covered in KIP-802:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options
> >
> > 4. No particular reason. We can support both formats like today. I've
> > updated the KIP
> >
> > Thanks,
> > Mickael
> >
> >
> >
> > On Tue, Nov 23, 2021 at 6:40 PM Chris Egerton
> >  wrote:
> > >
> > > Hi Mickael,
> > >
> > > I think the increase in scope here is great and the added value certainly
> > > justifies the proposed changes. I have some thoughts but overall I like
> > the
> > > direction this is going in now.
> > >
> > > 1. The new /plugins endpoint is described as containing "all plugins that
> > > are Connectors, Transformations, Converters, HeaderConverters and
> > > Predicates". So essentially, it looks like we want to expose all plugins
> > > that are configured on a per-connector basis, but exclude plugins that
> > are
> > > configured on a per-worker basis (such as config providers and REST
> > > extensions). Do you think it may be valuable to expose information on
> > > worker-level plugins as well?
> > >
> > > 2. The description for the new /plugins endpoint also states that
> > "Plugins
> > > will be grouped by plugin.path. This will make it clear to users what's
> > > available to use as it's not possible to use a Connector from one path
> > with
> > > Transformations from another.". Is this true? I thought that Connect's
> > > classloading made it possible to package
> > > converters/transformations/predicates completely independently from each
> > > other, and to reference them from also-independently-packaged connectors.
> > > If it turns out that this is the case, could we consider restructuring
> > the
> > > response to be grouped by plugin type instead of by classloader? There's
> > > also the ungrouped format proposed in KIP-494 (
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120740150
> > )
> > > which we might consider as well.
> > >
> > > 3. I think this can be left for a follow-up KIP if necessary, but I'm
> > > curious about your thoughts on adding new validate methods to all
> > > connector-level plugins that can be used similarly to how the existing
> > > Connector::validate method (
> > >
> > https://github.com/a

[jira] [Resolved] (KAFKA-13482) JRE: Duplicate Key: Multiple bootstrap server URLs

2021-11-29 Thread Michael Anstis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Anstis resolved KAFKA-13482.

Resolution: Not A Problem

> JRE: Duplicate Key: Multiple bootstrap server URLs
> --
>
> Key: KAFKA-13482
> URL: https://issues.apache.org/jira/browse/KAFKA-13482
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.1
> Environment: Docker (Kafka server), Docker (client) and local client.
>Reporter: Michael Anstis
>Priority: Major
>
> I am running a Kafka server in a Docker Container.
> It needs to listen to both the Docker "internal" network and "external" local 
> network.
> Some services run in other Docker containers and need Kafka.
> Some services run locally, not in Docker, and need the same Kafka instance 
> too.
> Configuring bootstrap servers for the single Kafka instance:
> {code}kafka.bootstrap.servers=PLAINTEXT://localhost:49212,OUTSIDE://kafka-WLff1:9092{code}
> Leads to the following stack trace:
> {code}
> 2021-11-25 16:17:10,559 ERROR [org.apa.kaf.cli.pro.int.Sender] 
> (kafka-producer-network-thread | kafka-producer-kogito-tracing-model) 
> [Producer clientId=kafka-producer-kogito-tracing-model] Uncaught error in 
> kafka producer I/O thread: : java.lang.IllegalStateException: Duplicate key 0 
> (attempted merging values localhost:49212 (id: 0 rack: null) and 
> kafka-WLff1:9092 (id: 0 rack: null))
>   at 
> java.base/java.util.stream.Collectors.duplicateKeyException(Collectors.java:133)
>   at 
> java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:180)
>   at 
> java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>   at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
>   at 
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>   at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>   at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>   at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>   at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>   at 
> org.apache.kafka.common.requests.MetadataResponse$Holder.createBrokers(MetadataResponse.java:414)
>   at 
> org.apache.kafka.common.requests.MetadataResponse$Holder.(MetadataResponse.java:407)
>   at 
> org.apache.kafka.common.requests.MetadataResponse.holder(MetadataResponse.java:187)
>   at 
> org.apache.kafka.common.requests.MetadataResponse.topicMetadata(MetadataResponse.java:210)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleSuccessfulResponse(NetworkClient.java:1086)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:887)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:570)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Thread.java:832)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.1 #24

2021-11-29 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-779: Allow Source Tasks to Handle Producer Exceptions

2021-11-29 Thread Knowles Atchison Jr
Good morning,

Bringing this back to the top.

We currently have

1 binding
4 non-binding

Knowles

On Fri, Nov 19, 2021 at 10:02 AM Knowles Atchison Jr 
wrote:

> Thank you all for voting. We still need two more binding votes.
>
> I have rebased and updated the PR to be ready to go once this vote passes:
>
> https://github.com/apache/kafka/pull/11382
>
> Knowles
>
> On Tue, Nov 16, 2021 at 3:43 PM Chris Egerton 
> wrote:
>
>> +1 (non-binding). Thanks Knowles!
>>
>> On Tue, Nov 16, 2021 at 10:48 AM Arjun Satish 
>> wrote:
>>
>> > +1 (non-binding). Thanks for the KIP, Knowles! and appreciate the
>> > follow-ups!
>> >
>> > On Thu, Nov 11, 2021 at 2:55 PM John Roesler 
>> wrote:
>> >
>> > > Thanks, Knowles!
>> > >
>> > > I'm +1 (binding)
>> > >
>> > > -John
>> > >
>> > > On Wed, 2021-11-10 at 12:42 -0500, Christopher Shannon
>> > > wrote:
>> > > > +1 (non-binding). This looks good to me and will be useful as a way
>> to
>> > > > handle producer errors.
>> > > >
>> > > > On Mon, Nov 8, 2021 at 8:55 AM Knowles Atchison Jr <
>> > > katchiso...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Good morning,
>> > > > >
>> > > > > I'd like to start a vote for KIP-779: Allow Source Tasks to Handle
>> > > Producer
>> > > > > Exceptions:
>> > > > >
>> > > > >
>> > > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-779%3A+Allow+Source+Tasks+to+Handle+Producer+Exceptions
>> > > > >
>> > > > > The purpose of this KIP is to allow Source Tasks the option to
>> > "ignore"
>> > > > > kafka producer exceptions. After a few iterations, this is now
>> part
>> > of
>> > > the
>> > > > > "errors.tolerance" configuration and provides a null
>> RecordMetadata
>> > to
>> > > > > commitRecord() in lieu of a new SourceTask interface method or
>> worker
>> > > > > configuration item.
>> > > > >
>> > > > > PR is here:
>> > > > >
>> > > > > https://github.com/apache/kafka/pull/11382
>> > > > >
>> > > > > Any comments and feedback are welcome.
>> > > > >
>> > > > > Knowles
>> > > > >
>> > >
>> > >
>> > >
>> >
>>
>


[DISCUSS] KIP-805: Add range and scan query support in IQ v2

2021-11-29 Thread Vasiliki Papavasileiou
Hello everyone,

I would like to start the discussion for KIP-805: Add range and scan query
support in IQ v2

The KIP can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2

Any suggestions are more than welcome.

Many thanks,
Vicky


Re: [VOTE] KIP-779: Allow Source Tasks to Handle Producer Exceptions

2021-11-29 Thread Mickael Maison
Hi Knowles,

+1 (binding)

Thanks for the KIP!

On Mon, Nov 29, 2021 at 12:56 PM Knowles Atchison Jr
 wrote:
>
> Good morning,
>
> Bringing this back to the top.
>
> We currently have
>
> 1 binding
> 4 non-binding
>
> Knowles
>
> On Fri, Nov 19, 2021 at 10:02 AM Knowles Atchison Jr 
> wrote:
>
> > Thank you all for voting. We still need two more binding votes.
> >
> > I have rebased and updated the PR to be ready to go once this vote passes:
> >
> > https://github.com/apache/kafka/pull/11382
> >
> > Knowles
> >
> > On Tue, Nov 16, 2021 at 3:43 PM Chris Egerton 
> > wrote:
> >
> >> +1 (non-binding). Thanks Knowles!
> >>
> >> On Tue, Nov 16, 2021 at 10:48 AM Arjun Satish 
> >> wrote:
> >>
> >> > +1 (non-binding). Thanks for the KIP, Knowles! and appreciate the
> >> > follow-ups!
> >> >
> >> > On Thu, Nov 11, 2021 at 2:55 PM John Roesler 
> >> wrote:
> >> >
> >> > > Thanks, Knowles!
> >> > >
> >> > > I'm +1 (binding)
> >> > >
> >> > > -John
> >> > >
> >> > > On Wed, 2021-11-10 at 12:42 -0500, Christopher Shannon
> >> > > wrote:
> >> > > > +1 (non-binding). This looks good to me and will be useful as a way
> >> to
> >> > > > handle producer errors.
> >> > > >
> >> > > > On Mon, Nov 8, 2021 at 8:55 AM Knowles Atchison Jr <
> >> > > katchiso...@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > Good morning,
> >> > > > >
> >> > > > > I'd like to start a vote for KIP-779: Allow Source Tasks to Handle
> >> > > Producer
> >> > > > > Exceptions:
> >> > > > >
> >> > > > >
> >> > > > >
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-779%3A+Allow+Source+Tasks+to+Handle+Producer+Exceptions
> >> > > > >
> >> > > > > The purpose of this KIP is to allow Source Tasks the option to
> >> > "ignore"
> >> > > > > kafka producer exceptions. After a few iterations, this is now
> >> part
> >> > of
> >> > > the
> >> > > > > "errors.tolerance" configuration and provides a null
> >> RecordMetadata
> >> > to
> >> > > > > commitRecord() in lieu of a new SourceTask interface method or
> >> worker
> >> > > > > configuration item.
> >> > > > >
> >> > > > > PR is here:
> >> > > > >
> >> > > > > https://github.com/apache/kafka/pull/11382
> >> > > > >
> >> > > > > Any comments and feedback are welcome.
> >> > > > >
> >> > > > > Knowles
> >> > > > >
> >> > >
> >> > >
> >> > >
> >> >
> >>
> >


Re: [DISCUSS] KIP-714: Client metrics and observability

2021-11-29 Thread Magnus Edenhill
Hi Viktor,

that's a good idea, I've added a bunch of broker-side metrics for the
client metrics handling.
There might be more added during development as the need arise.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Newbrokermetrics

Thanks,
Magnus

Den mån 22 nov. 2021 kl 11:08 skrev Viktor Somogyi-Vass <
viktorsomo...@gmail.com>:

> Hi Magnus,
>
> I think this is a very useful addition. We also have a similar (but much
> more simplistic) implementation of this. Maybe I missed it in the KIP but
> what about adding metrics about the subscription cache itself? That I think
> would improve its usability and debuggability as we'd be able to see its
> performance, hit/miss rates, eviction counts and others.
>
> Best,
> Viktor
>
> On Thu, Nov 18, 2021 at 5:12 PM Magnus Edenhill 
> wrote:
>
> > Hi Mickael,
> >
> > see inline.
> >
> > Den ons 10 nov. 2021 kl 15:21 skrev Mickael Maison <
> > mickael.mai...@gmail.com
> > >:
> >
> > > Hi Magnus,
> > >
> > > I see you've addressed some of the points I raised above but some (4,
> > > 5) have not been addressed yet.
> > >
> >
> > Re 4) How will the user/app know metrics are being sent.
> >
> > One possibility is to add a JMX metric (thus for user consumption) for
> the
> > number of metric pushes the
> > client has performed, or perhaps the number of metrics subscriptions
> > currently being collected.
> > Would that be sufficient?
> >
> > Re 5) Metric sizes and rates
> >
> > A worst case scenario for a producer that is producing to 50 unique
> topics
> > and emitting all standard metrics yields
> > a serialized size of around 100KB prior to compression, which compresses
> > down to about 20-30% of that depending
> > on compression type and topic name uniqueness.
> > The numbers for a consumer would be similar.
> >
> > In practice the number of unique topics would be far less, and the
> > subscription set would typically be for a subset of metrics.
> > So we're probably closer to 1kb, or less, compressed size per client per
> > push interval.
> >
> > As both the subscription set and push intervals are controlled by the
> > cluster operator it shouldn't be too hard
> > to strike a good balance between metrics overhead and granularity.
> >
> >
> >
> > >
> > > I'm really uneasy with this being enabled by default on the client
> > > side. When collecting data, I think the best practice is to ensure
> > > users are explicitly enabling it.
> > >
> >
> > Requiring metrics to be explicitly enabled on clients severely cripples
> its
> > usability and value.
> >
> > One of the problems that this KIP aims to solve is for useful metrics to
> be
> > available on demand
> > regardless of the technical expertise of the user. As Ryanne points, out
> a
> > savvy user/organization
> > will typically have metrics collection and monitoring in place already,
> and
> > the benefits of this KIP
> > are then more of a common set and format metrics across client
> > implementations and languages.
> > But that is not the typical Kafka user in my experience, they're not
> Kafka
> > experts and they don't have the
> > knowledge of how to best instrument their clients.
> > Having metrics enabled by default for this user base allows the Kafka
> > operators to proactively and reactively
> > monitor and troubleshoot client issues, without the need for the less
> savvy
> > user to do anything.
> > It is often too late to tell a user to enable metrics when the problem
> has
> > already occurred.
> >
> > Now, to be clear, even though metrics are enabled by default on clients
> it
> > is not enabled by default
> > on the brokers; the Kafka operator needs to build and set up a metrics
> > plugin and add metrics subscriptions
> > before anything is sent from the client.
> > It is opt-out on the clients and opt-in on the broker.
> >
> >
> >
> >
> > > You mentioned brokers already have
> > > some(most?) of the information contained in metrics, if so then why
> > > are we collecting it again? Surely there must be some new information
> > > in the client metrics.
> > >
> >
> > From the user's perspective the Kafka infrastructure extends from
> > producer.send() to
> > messages being returned from consumer.poll(), a giant black box where
> > there's a lot going on between those
> > two points. The brokers currently only see what happens once those
> requests
> > and messages hits the broker,
> > but as Kafka clients are complex pieces of machinery there's a myriad of
> > queues, timers, and state
> > that's critical to the operation and infrastructure that's not currently
> > visible to the operator.
> > Relying on the user to accurately and timely provide this missing
> > information is not generally feasible.
> >
> >
> > Most of the standard metrics listed in the KIP are data points that the
> > broker does not have.
> > Only a small number of metrics are duplicates (like the request counts
> and
> > sizes), but they are in

Re: [DISCUSS] KIP-714: Client metrics and observability

2021-11-29 Thread Magnus Edenhill
Hey Bob,

That's a good point.

Request type labels were considered but since they're already tracked by
broker-side metrics
they were left out as to avoid metric duplication, however those metrics
are not per connection,
so they won't be that useful in practice for troubleshooting specific
client instances.

I'll add the request_type label to the relevant metrics.

Thanks,
Magnus


Den tis 23 nov. 2021 kl 19:20 skrev Bob Barrett
:

> Hi Magnus,
>
> Thanks for the thorough KIP, this seems very useful.
>
> Would it make sense to include the request type as a label for the
> `client.request.success`, `client.request.errors` and `client.request.rtt`
> metrics? I think it would be very useful to see which specific requests are
> succeeding and failing for a client. One specific case I can think of where
> this could be useful is producer batch timeouts. If a Java application does
> not enable producer client logs (unfortunately, in my experience this
> happens more often than it should), the application logs will only contain
> the expiration error message, but no information about what is causing the
> timeout. The requests might all be succeeding but taking too long to
> process batches, or metadata requests might be failing, or some or all
> produce requests might be failing (if the bootstrap servers are reachable
> from the client but one or more other brokers are not, for example). If the
> cluster operator is able to identify the specific requests that are slow or
> failing for a client, they will be better able to diagnose the issue
> causing batch timeouts.
>
> One drawback I can think of is that this will increase the cardinality of
> the request metrics. But any given client is only going to use a small
> subset of the request types, and since we already have partition labels for
> the topic-level metrics, I think request labels will still make up a
> relatively small percentage of the set of metrics.
>
> Thanks,
> Bob
>
> On Mon, Nov 22, 2021 at 2:08 AM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> wrote:
>
> > Hi Magnus,
> >
> > I think this is a very useful addition. We also have a similar (but much
> > more simplistic) implementation of this. Maybe I missed it in the KIP but
> > what about adding metrics about the subscription cache itself? That I
> think
> > would improve its usability and debuggability as we'd be able to see its
> > performance, hit/miss rates, eviction counts and others.
> >
> > Best,
> > Viktor
> >
> > On Thu, Nov 18, 2021 at 5:12 PM Magnus Edenhill 
> > wrote:
> >
> > > Hi Mickael,
> > >
> > > see inline.
> > >
> > > Den ons 10 nov. 2021 kl 15:21 skrev Mickael Maison <
> > > mickael.mai...@gmail.com
> > > >:
> > >
> > > > Hi Magnus,
> > > >
> > > > I see you've addressed some of the points I raised above but some (4,
> > > > 5) have not been addressed yet.
> > > >
> > >
> > > Re 4) How will the user/app know metrics are being sent.
> > >
> > > One possibility is to add a JMX metric (thus for user consumption) for
> > the
> > > number of metric pushes the
> > > client has performed, or perhaps the number of metrics subscriptions
> > > currently being collected.
> > > Would that be sufficient?
> > >
> > > Re 5) Metric sizes and rates
> > >
> > > A worst case scenario for a producer that is producing to 50 unique
> > topics
> > > and emitting all standard metrics yields
> > > a serialized size of around 100KB prior to compression, which
> compresses
> > > down to about 20-30% of that depending
> > > on compression type and topic name uniqueness.
> > > The numbers for a consumer would be similar.
> > >
> > > In practice the number of unique topics would be far less, and the
> > > subscription set would typically be for a subset of metrics.
> > > So we're probably closer to 1kb, or less, compressed size per client
> per
> > > push interval.
> > >
> > > As both the subscription set and push intervals are controlled by the
> > > cluster operator it shouldn't be too hard
> > > to strike a good balance between metrics overhead and granularity.
> > >
> > >
> > >
> > > >
> > > > I'm really uneasy with this being enabled by default on the client
> > > > side. When collecting data, I think the best practice is to ensure
> > > > users are explicitly enabling it.
> > > >
> > >
> > > Requiring metrics to be explicitly enabled on clients severely cripples
> > its
> > > usability and value.
> > >
> > > One of the problems that this KIP aims to solve is for useful metrics
> to
> > be
> > > available on demand
> > > regardless of the technical expertise of the user. As Ryanne points,
> out
> > a
> > > savvy user/organization
> > > will typically have metrics collection and monitoring in place already,
> > and
> > > the benefits of this KIP
> > > are then more of a common set and format metrics across client
> > > implementations and languages.
> > > But that is not the typical Kafka user in my experience, they're not
> > Kafka
> > > experts and they don't have the
> > > knowledge of how to

[jira] [Resolved] (KAFKA-13200) Fix version of MirrorMaker2 connectors

2021-11-29 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13200.

Fix Version/s: 3.2.0
   Resolution: Fixed

> Fix version of MirrorMaker2 connectors
> --
>
> Key: KAFKA-13200
> URL: https://issues.apache.org/jira/browse/KAFKA-13200
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.2.0
>
>
> MirrorMaker2 connectors have their version hardcoded to 1. Instead the should 
> use the Kafka version like 2.8.0



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-779: Allow Source Tasks to Handle Producer Exceptions

2021-11-29 Thread Tom Bentley
Hi Knowles,

Thanks for the KIP, +1 (binding)

Kind regards,

Tom

On 11/29/21, Mickael Maison  wrote:
> Hi Knowles,
>
> +1 (binding)
>
> Thanks for the KIP!
>
> On Mon, Nov 29, 2021 at 12:56 PM Knowles Atchison Jr
>  wrote:
>>
>> Good morning,
>>
>> Bringing this back to the top.
>>
>> We currently have
>>
>> 1 binding
>> 4 non-binding
>>
>> Knowles
>>
>> On Fri, Nov 19, 2021 at 10:02 AM Knowles Atchison Jr
>> 
>> wrote:
>>
>> > Thank you all for voting. We still need two more binding votes.
>> >
>> > I have rebased and updated the PR to be ready to go once this vote
>> > passes:
>> >
>> > https://github.com/apache/kafka/pull/11382
>> >
>> > Knowles
>> >
>> > On Tue, Nov 16, 2021 at 3:43 PM Chris Egerton
>> > 
>> > wrote:
>> >
>> >> +1 (non-binding). Thanks Knowles!
>> >>
>> >> On Tue, Nov 16, 2021 at 10:48 AM Arjun Satish 
>> >> wrote:
>> >>
>> >> > +1 (non-binding). Thanks for the KIP, Knowles! and appreciate the
>> >> > follow-ups!
>> >> >
>> >> > On Thu, Nov 11, 2021 at 2:55 PM John Roesler 
>> >> wrote:
>> >> >
>> >> > > Thanks, Knowles!
>> >> > >
>> >> > > I'm +1 (binding)
>> >> > >
>> >> > > -John
>> >> > >
>> >> > > On Wed, 2021-11-10 at 12:42 -0500, Christopher Shannon
>> >> > > wrote:
>> >> > > > +1 (non-binding). This looks good to me and will be useful as a
>> >> > > > way
>> >> to
>> >> > > > handle producer errors.
>> >> > > >
>> >> > > > On Mon, Nov 8, 2021 at 8:55 AM Knowles Atchison Jr <
>> >> > > katchiso...@gmail.com>
>> >> > > > wrote:
>> >> > > >
>> >> > > > > Good morning,
>> >> > > > >
>> >> > > > > I'd like to start a vote for KIP-779: Allow Source Tasks to
>> >> > > > > Handle
>> >> > > Producer
>> >> > > > > Exceptions:
>> >> > > > >
>> >> > > > >
>> >> > > > >
>> >> > >
>> >> >
>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-779%3A+Allow+Source+Tasks+to+Handle+Producer+Exceptions
>> >> > > > >
>> >> > > > > The purpose of this KIP is to allow Source Tasks the option to
>> >> > "ignore"
>> >> > > > > kafka producer exceptions. After a few iterations, this is now
>> >> part
>> >> > of
>> >> > > the
>> >> > > > > "errors.tolerance" configuration and provides a null
>> >> RecordMetadata
>> >> > to
>> >> > > > > commitRecord() in lieu of a new SourceTask interface method or
>> >> worker
>> >> > > > > configuration item.
>> >> > > > >
>> >> > > > > PR is here:
>> >> > > > >
>> >> > > > > https://github.com/apache/kafka/pull/11382
>> >> > > > >
>> >> > > > > Any comments and feedback are welcome.
>> >> > > > >
>> >> > > > > Knowles
>> >> > > > >
>> >> > >
>> >> > >
>> >> > >
>> >> >
>> >>
>> >
>
>



[VOTE] KIP-799: Align behaviour for producer callbacks with documented behaviour

2021-11-29 Thread Séamus Ó Ceanainn
Hi everyone,

I'd like to start a vote for KIP-799: Align behaviour for producer
callbacks with documented behaviour

.

The KIP proposes a breaking change in the behaviour of producer client
callbacks. The breaking change would align the behaviour of callbacks with
the documented behaviour for the method, and makes it consistent with
similar methods for producer client interceptors.

Regards,
Séamus.


[jira] [Created] (KAFKA-13488) Producer fails to recover if topic gets deleted (and gets auto-created)

2021-11-29 Thread Prateek Agarwal (Jira)
Prateek Agarwal created KAFKA-13488:
---

 Summary: Producer fails to recover if topic gets deleted (and gets 
auto-created)
 Key: KAFKA-13488
 URL: https://issues.apache.org/jira/browse/KAFKA-13488
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.8.1, 2.7.2, 2.6.3, 2.5.1, 2.4.1, 2.3.1, 2.2.2
Reporter: Prateek Agarwal
Assignee: Prateek Agarwal


Producer currently fails to produce messages to a topic if the topic is deleted 
and gets auto-created OR is created manually during the lifetime of the 
producer (and certain other conditions are met - leaderEpochs of deleted topic 
> 0).

 

To reproduce, these are the steps which can be carried out:

0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true.

1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1)

2) Reassign the partitions such that P0-> (1,0), P1-> (1,0).

2) Create a producer P and send few messages which land on all the TPs of topic 
T.

3) Delete the topic T

4) Immediately, send a new message from producer P, this message will be failed 
to send and eventually timed out.

A test-case which fails with the above steps is added at the end as well as a 
patch file.

 

This happens after leaderEpoch (KIP-320) was introduced in the MetadataResponse 
KAFKA-7738. There is a solution attempted to fix this issue in KAFKA-12257, but 
the solution has a bug due to which the above use-case still fails.

 

*Issue in the solution of KAFKA-12257:*
{code:java}
// org.apache.kafka.clients.Metadata.handleMetadataResponse():
   ...
        Map topicIds = new HashMap<>();
        Map oldTopicIds = cache.topicIds();
        for (MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
            String topicName = metadata.topic();
            Uuid topicId = metadata.topicId();
            topics.add(topicName);
            // We can only reason about topic ID changes when both IDs are 
valid, so keep oldId null unless the new metadata contains a topic ID
            Uuid oldTopicId = null;
            if (!Uuid.ZERO_UUID.equals(topicId)) {
                topicIds.put(topicName, topicId);
                oldTopicId = oldTopicIds.get(topicName);
            } else {
                 topicId = null;
            }
...
} {code}
With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} 
gets created afresh. When a topic is deleted and created immediately soon 
afterwards (because of auto.create being true), producer's call to 
{{MetadataRequest}} for the deleted topic T will result in a 
{{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error 
{{MetadataResponse}} depending on which point of topic recreation metadata is 
being asked at. In the case of errors, TopicId returned back in the response is 
{{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId received is 
ZERO, the method removes the earlier topicId entry from the cache.

Now, when a non-Error Metadata Response does come back for the newly created 
topic T, it will have a non-ZERO topicId now but the leaderEpoch for the 
partitions will mostly be ZERO. This situation will lead to rejection of the 
new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer 
to KAFKA-12257). Because of the rejection of the metadata, producer will never 
get to know the new Leader of the TPs of the newly created topic.

 

{{*}} 1. Solution / Fix (Preferred){*}:
Client's metadata should keep on remembering the old topicId till:
1) response for the TP has ERRORs
2) topicId entry was already present in the cache earlier
3) retain time is not expired
{code:java}
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -336,6 +336,10 @@ public class Metadata implements Closeable {
 topicIds.put(topicName, topicId);
 oldTopicId = oldTopicIds.get(topicName);
 } else {
+// Retain the old topicId for comparison with newer TopicId 
created later. This is only needed till retainMs
+if (metadata.error() != Errors.NONE && 
oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs))
+topicIds.put(topicName, oldTopicIds.get(topicName));
+else
 topicId = null;
 }

{code}
{{*}} 2. Alternative Solution / Fix {{*}}:
To allow updates to LeaderEpoch when originalTopicId was {{{}null{}}}. This is 
less desirable as when cluster moves from no topic IDs to using topic IDs, we 
will count this topic as new and update LeaderEpoch irrespective of whether 
newEpoch was greater than current or not.
{code:java}
@@ -394,7 +398,7 @@ public class Metadata implements Closeable {
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPrese

Re: [VOTE] KIP-779: Allow Source Tasks to Handle Producer Exceptions

2021-11-29 Thread Knowles Atchison Jr
Thank you all for voting!

KIP-779 has been approved:

3 binding votes (John, Mickael, Tom)
4 non-binding votes (Knowles, Chris S., Chris E., Arjun)

The vote is now closed. Other than modifying the wiki, is anything
additional I need to do vote wise?

Knowles

On Mon, Nov 29, 2021 at 10:49 AM Tom Bentley  wrote:

> Hi Knowles,
>
> Thanks for the KIP, +1 (binding)
>
> Kind regards,
>
> Tom
>
> On 11/29/21, Mickael Maison  wrote:
> > Hi Knowles,
> >
> > +1 (binding)
> >
> > Thanks for the KIP!
> >
> > On Mon, Nov 29, 2021 at 12:56 PM Knowles Atchison Jr
> >  wrote:
> >>
> >> Good morning,
> >>
> >> Bringing this back to the top.
> >>
> >> We currently have
> >>
> >> 1 binding
> >> 4 non-binding
> >>
> >> Knowles
> >>
> >> On Fri, Nov 19, 2021 at 10:02 AM Knowles Atchison Jr
> >> 
> >> wrote:
> >>
> >> > Thank you all for voting. We still need two more binding votes.
> >> >
> >> > I have rebased and updated the PR to be ready to go once this vote
> >> > passes:
> >> >
> >> > https://github.com/apache/kafka/pull/11382
> >> >
> >> > Knowles
> >> >
> >> > On Tue, Nov 16, 2021 at 3:43 PM Chris Egerton
> >> > 
> >> > wrote:
> >> >
> >> >> +1 (non-binding). Thanks Knowles!
> >> >>
> >> >> On Tue, Nov 16, 2021 at 10:48 AM Arjun Satish <
> arjun.sat...@gmail.com>
> >> >> wrote:
> >> >>
> >> >> > +1 (non-binding). Thanks for the KIP, Knowles! and appreciate the
> >> >> > follow-ups!
> >> >> >
> >> >> > On Thu, Nov 11, 2021 at 2:55 PM John Roesler 
> >> >> wrote:
> >> >> >
> >> >> > > Thanks, Knowles!
> >> >> > >
> >> >> > > I'm +1 (binding)
> >> >> > >
> >> >> > > -John
> >> >> > >
> >> >> > > On Wed, 2021-11-10 at 12:42 -0500, Christopher Shannon
> >> >> > > wrote:
> >> >> > > > +1 (non-binding). This looks good to me and will be useful as a
> >> >> > > > way
> >> >> to
> >> >> > > > handle producer errors.
> >> >> > > >
> >> >> > > > On Mon, Nov 8, 2021 at 8:55 AM Knowles Atchison Jr <
> >> >> > > katchiso...@gmail.com>
> >> >> > > > wrote:
> >> >> > > >
> >> >> > > > > Good morning,
> >> >> > > > >
> >> >> > > > > I'd like to start a vote for KIP-779: Allow Source Tasks to
> >> >> > > > > Handle
> >> >> > > Producer
> >> >> > > > > Exceptions:
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > >
> >> >> >
> >> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-779%3A+Allow+Source+Tasks+to+Handle+Producer+Exceptions
> >> >> > > > >
> >> >> > > > > The purpose of this KIP is to allow Source Tasks the option
> to
> >> >> > "ignore"
> >> >> > > > > kafka producer exceptions. After a few iterations, this is
> now
> >> >> part
> >> >> > of
> >> >> > > the
> >> >> > > > > "errors.tolerance" configuration and provides a null
> >> >> RecordMetadata
> >> >> > to
> >> >> > > > > commitRecord() in lieu of a new SourceTask interface method
> or
> >> >> worker
> >> >> > > > > configuration item.
> >> >> > > > >
> >> >> > > > > PR is here:
> >> >> > > > >
> >> >> > > > > https://github.com/apache/kafka/pull/11382
> >> >> > > > >
> >> >> > > > > Any comments and feedback are welcome.
> >> >> > > > >
> >> >> > > > > Knowles
> >> >> > > > >
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> >
> >> >>
> >> >
> >
> >
>
>


Re: [DISCUSS] KIP-795: Add public APIs for AbstractCoordinator

2021-11-29 Thread Hector Geraldino (BLOOMBERG/ 919 3RD A)
Hello again Tom, kafka devs

First, congrats on becoming a PMC member! That's so cool.

Since your last reply I've updated the KIP trying to address some of your 
suggestions. A few more details have been added to the motivation section, and 
also went ahead and opened a draft pull-request with the changes I think are 
needed for this KIP, with the hope that it makes it easier to discuss the 
general approach and any other concerns the community may have.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator
PR: https://github.com/apache/kafka/pull/11515

Looking forward for some community feedback.

Regards,
Hector

From: dev@kafka.apache.org At: 11/11/21 17:15:17 UTC-5:00To:  
dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-795: Add public APIs for AbstractCoordinator

Hi Tom,

Thanks for taking time reviewing the KIP. 

I think it's reasonable to ask if Kafka's Group Coordination protocol should be 
used for use cases other than the distributed event log. This was actually 
briefly addressed by Gwen Shapira during her presentation at the strangeloop 
conference in '18 (a link to the video is included in the KIP), in which she 
explain in greater details the protocol internals.

We should also keep in mind that this protocol is already being used for other 
use cases outside of core Kafka: Confluent Schema Registry uses it to determine 
leadership between members of a cluster, Kafka Connect uses it for task 
assignments, same with Kafka Stream for partition and task distributions, and 
so on. So having a public, stable API not just for new use cases (like ours) 
but existing ones is IMHO a good thing to have. I'll amend the KIP and add a 
bit more details to the motivation and alternatives sections, so the usefulness 
of this KIP is better understood.

Now, for the first point of your technical observations (regarding 
protocolTypes()), I don't think it matters in this context, as the protocol 
name and subtype are only relevant in the context of a consumer group and group 
rebalance. It really doesn't matter if two different libraries decide to name 
their protocols the same.

For item #2, I was under the impression that, because these classes all 
implement the org.apache.kafka.common.protocol.[Message, ApiMessage] interface, 
they are implicitly part of the Kafka protocol and the top-level API. Isn't 
that really the case?

And finally, for #3, the goal I had in mind when creating this KPI was a small 
one: to provide an interface that users can rely on when extending the 
AbstactCoordinator. So my thought was that, while the AbstractCoordinator 
itself uses some internal APIs (like ConsumerNetworkClient, ConsumerMetadata 
and so on) those can remain internal. But it probably makes sense to at least 
explore the possibility of moving the whole AbstractCoordinator class to be 
part of the public API. I'll do that exercise, see what it entails, and update 
the KIP with my findings.


Thanks again!
Hector


From: dev@kafka.apache.org At: 11/10/21 06:43:59 UTC-5:00To:  Hector Geraldino 
(BLOOMBERG/ 919 3RD A ) ,  dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-795: Add public APIs for AbstractCoordinator

Hi Hector,

Thanks for the KIP.

At a high level, I think the question to be answered by the community is
"Should Kafka really be providing this kind of cluster management API?".
While Kafka clients need this to provide their functionality it's a
different thing to expose that as a public API of the project, which is
otherwise about providing a distributed event log/data streaming
platform/. Having a public
API brings a significant commitment for API compatibility, which could
impair the ability of the project to change the API in order to make
improvements to the Kafka clients. The current AbstractCoordinator not
being a supported API means we don't currently have to reason about
compatibility here. So I think it would help the motivation section of the
KIP to describe in a bit more detail the use case(s) you have for
implementing your own coordinators. For example, are these applications
using Kafka otherwise, or just to leverage this API? And what alternatives
to implementing your own coordinators did you consider, and why did you
reject them?

From a technical point of view, there are a number of issues I think would
need addressing in order to do something like this:

1. There probably ought to be a way to ensure that protocolTypes() don't
collide, or at least reduce the chances of a collision. While probably
unlikely in practice the consequences of different protocols having the
same name could be pretty confusing to debug.
2. JoinGroupRequestData and JoinGroupResponseData are not public classes
(none of the *RequestData or *ResponseData classes are, intentionally), so
there would have to be an abstraction for them.
3. It's all well and good having an interface that anyone can implement,
but there is no supported Kafka API which takes an instanc

Filtering support on Fetch API

2021-11-29 Thread Talat Uyarer
Hi All,

I want to get your advice about one subject. I want to create a KIP for
message header base filtering on Fetch API.

Our current use case We have 1k+ topics and per topic, have 10+ consumers
for different use cases. However all consumers are interested in different
sets of messages on the same topic. Currently  We read all messages from a
given topic and drop logs on the consumer side. To reduce our stream
processing cost I want to drop logs on the broker side. So far my
understanding

*Broker send messages as is (No serilization cost) -> Network Transfer ->
> Consumer Deserialize Messages(User side deserilization cost) -> User Space
> drop or use messages (User Sidefiltering cost)*


If I can drop messages based on their headers without serialization and
deserialization messages. It will help us save network bandwidth and as
well as consumer side cpu cost.

My approach is building a header index. Consumer clients will define
their filter in the fetch call. If the filter is matching, the broker will
send the messages. I would like to hear your suggestions about my solution.

Thanks


Re: [DISCUSS] KIP-785 Automatic storage formatting

2021-11-29 Thread Igor Soarez
Hi all,

Bumping this thread as it’s been a while.

Looking forward to any kind of feedback, pease take a look.

I created a short PR with a possible implementation - 
https://github.com/apache/kafka/pull/11549

--
Igor



> On 18 Oct 2021, at 15:11, Igor Soarez  wrote:
> 
> Hi all,
> 
> I'd like to propose that we simplify the operation of KRaft servers a bit by 
> removing the requirement to run kafka-storage.sh for new storage directories.
> 
> Please take a look at the KIP and provide your feedback:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-785%3A+Automatic+storage+formatting
> 
> --
> Igor
> 



Re: Handling retriable exceptions during Connect source task start

2021-11-29 Thread Chris Egerton
Hi Gunnar,

I think there's some risk of introducing this retry behavior if we end up
invoking Connector::start or Task::start on the same object multiple times.
Unexpected behavior may result, such as double-allocation of resources that
are initialized in the start method and which are meant to be released in
the stop method. An alternative could be to invoke stop on the object to
allow it to perform best-effort cleanup, then initialize an entirely new
Connector or Task instance, and invoke its start method.

As far as a KIP goes, I think one might be a good idea just to ensure that
the desired behavior is agreed upon and then protected as part of the
contract for the Connector/Task API. Otherwise, if this is implemented
without a KIP, we might just as easily roll things back or change the scope
of what constitutes a retriable exception without a KIP, which might be
frustrating for connector developers.

As a final note, if the approach proposed above (invoking stop on the
failed object, then reallocating a new one and invoking start on it) seems
reasonable, we might also consider using that kind of technique for a
general "automatic restart" feature that catches anything that causes a
connector or task to fail at the moment and tries to bring it back to life.

Cheers,

Chris

On Sun, Nov 28, 2021 at 10:26 PM Luke Chen  wrote:

> Hi Gunnar and Sergei,
> I think it's good to have a retriable exception handling during task#start.
>
> > are retriable exceptions during start disallowed by
> design, and the task must not throw retriable exceptions during start, or
> it's just currently not supported by the Connect framework and I just need
> to implement proper error handling in the connector?
>
> > Would it require a KIP?
>
> Sorry, I'm not sure if it's by design or not supported and needed to be
> implemented.
> But I guess if you want to implement the error handling in connector, you
> might leverage existing retry configuration, or you'll create a new one.
> Either way, I think it needs a small KIP as you mentioned, task#start is
> not covered in KIP-298. On the other hand, I think having a KIP first is
> good, to make sure you're on the right track, before you get your hand
> dirty. Besides, KIP discussion would have more attention, I think. :)
>
> Thank you.
> Luke
>
> On Fri, Nov 26, 2021 at 4:09 PM Gunnar Morling
>  wrote:
>
> > Hi all,
> >
> > We encountered a similar situation in Debezium again, where an exception
> > during Task::start() would be desirable to be retried.
> >
> > Would anything speak against implementing retriable support for
> > Task::start() in Kafka Connect? Would it require a KIP?
> >
> > Thanks,
> >
> > --Gunnar
> >
> >
> > Am Mo., 9. Aug. 2021 um 10:47 Uhr schrieb Gunnar Morling <
> > gunnar.morl...@googlemail.com>:
> >
> > > Hi,
> > >
> > > To ask slightly differently: would there be interest in a pull request
> > for
> > > implementing retries, in case RetriableException is thrown from the
> > > Task::start() method?
> > >
> > > Thanks,
> > >
> > > --Gunnar
> > >
> > >
> > > Am Do., 5. Aug. 2021 um 22:27 Uhr schrieb Sergei Morozov <
> moro...@tut.by
> > >:
> > >
> > >> Hi,
> > >>
> > >> I'm trying to address an issue in Debezium (DBZ-3823
> > >> ) where a source connector
> > >> task
> > >> cannot recover from a retriable exception.
> > >>
> > >> The root cause is that the task interacts with the source database
> > during
> > >> SourceTask#start but Kafka Connect doesn't handle retriable exceptions
> > >> thrown at this stage as retriable. KIP-298
> > >> <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> > >> >
> > >> that
> > >> originally introduced handling of retriable exception doesn't describe
> > >> handling task start exceptions, so it's unclear to me whether those
> > aren't
> > >> allowed by design or it was just out of the scope of the KIP.
> > >>
> > >> My current working solution
> > >>  relies
> > >> on the internal Debezium implementation of the task restart which
> > >> introduces certain risks (the details are in the PR description).
> > >>
> > >> The question is: are retriable exceptions during start disallowed by
> > >> design, and the task must not throw retriable exceptions during start,
> > or
> > >> it's just currently not supported by the Connect framework and I just
> > need
> > >> to implement proper error handling in the connector?
> > >>
> > >> Thanks!
> > >>
> > >> --
> > >> Sergei Morozov
> > >>
> > >
> >
>


Build failed in Jenkins: Kafka » Kafka Branch Builder » 3.1 #25

2021-11-29 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 501254 lines...]
[2021-11-29T21:48:19.780Z] GetOffsetShellTest > testTopicPartitionsArg() STARTED
[2021-11-29T21:48:23.401Z] 
[2021-11-29T21:48:23.401Z] GetOffsetShellTest > testTopicPartitionsArg() PASSED
[2021-11-29T21:48:23.401Z] 
[2021-11-29T21:48:23.401Z] GetOffsetShellTest > testInternalExcluded() STARTED
[2021-11-29T21:48:28.063Z] 
[2021-11-29T21:48:28.063Z] GetOffsetShellTest > testInternalExcluded() PASSED
[2021-11-29T21:48:28.063Z] 
[2021-11-29T21:48:28.063Z] GetOffsetShellTest > 
testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern() STARTED
[2021-11-29T21:48:31.679Z] 
[2021-11-29T21:48:31.679Z] GetOffsetShellTest > 
testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern() PASSED
[2021-11-29T21:48:31.679Z] 
[2021-11-29T21:48:31.679Z] GetOffsetShellTest > 
testTopicPartitionsNotFoundForExcludedInternalTopic() STARTED
[2021-11-29T21:48:36.339Z] 
[2021-11-29T21:48:36.339Z] GetOffsetShellTest > 
testTopicPartitionsNotFoundForExcludedInternalTopic() PASSED
[2021-11-29T21:48:36.339Z] 
[2021-11-29T21:48:36.339Z] DeleteTopicTest > testDeleteTopicWithCleaner() 
STARTED
[2021-11-29T21:49:12.028Z] 
[2021-11-29T21:49:12.028Z] DeleteTopicTest > testDeleteTopicWithCleaner() PASSED
[2021-11-29T21:49:12.028Z] 
[2021-11-29T21:49:12.028Z] DeleteTopicTest > 
testResumeDeleteTopicOnControllerFailover() STARTED
[2021-11-29T21:49:17.848Z] 
[2021-11-29T21:49:17.848Z] DeleteTopicTest > 
testResumeDeleteTopicOnControllerFailover() PASSED
[2021-11-29T21:49:17.848Z] 
[2021-11-29T21:49:17.848Z] DeleteTopicTest > 
testResumeDeleteTopicWithRecoveredFollower() STARTED
[2021-11-29T21:49:22.522Z] 
[2021-11-29T21:49:22.522Z] DeleteTopicTest > 
testResumeDeleteTopicWithRecoveredFollower() PASSED
[2021-11-29T21:49:22.522Z] 
[2021-11-29T21:49:22.522Z] DeleteTopicTest > 
testDeleteTopicAlreadyMarkedAsDeleted() STARTED
[2021-11-29T21:49:26.151Z] 
[2021-11-29T21:49:26.151Z] DeleteTopicTest > 
testDeleteTopicAlreadyMarkedAsDeleted() PASSED
[2021-11-29T21:49:26.151Z] 
[2021-11-29T21:49:26.151Z] DeleteTopicTest > 
testIncreasePartitionCountDuringDeleteTopic() STARTED
[2021-11-29T21:49:34.845Z] 
[2021-11-29T21:49:34.845Z] DeleteTopicTest > 
testIncreasePartitionCountDuringDeleteTopic() PASSED
[2021-11-29T21:49:34.845Z] 
[2021-11-29T21:49:34.845Z] DeleteTopicTest > 
testPartitionReassignmentDuringDeleteTopic() STARTED
[2021-11-29T21:49:40.665Z] 
[2021-11-29T21:49:40.665Z] DeleteTopicTest > 
testPartitionReassignmentDuringDeleteTopic() PASSED
[2021-11-29T21:49:40.665Z] 
[2021-11-29T21:49:40.665Z] DeleteTopicTest > testDeleteNonExistingTopic() 
STARTED
[2021-11-29T21:49:44.293Z] 
[2021-11-29T21:49:44.293Z] DeleteTopicTest > testDeleteNonExistingTopic() PASSED
[2021-11-29T21:49:44.293Z] 
[2021-11-29T21:49:44.293Z] DeleteTopicTest > testRecreateTopicAfterDeletion() 
STARTED
[2021-11-29T21:49:47.920Z] 
[2021-11-29T21:49:47.920Z] DeleteTopicTest > testRecreateTopicAfterDeletion() 
PASSED
[2021-11-29T21:49:47.920Z] 
[2021-11-29T21:49:47.920Z] DeleteTopicTest > testDisableDeleteTopic() STARTED
[2021-11-29T21:49:51.547Z] 
[2021-11-29T21:49:51.547Z] DeleteTopicTest > testDisableDeleteTopic() PASSED
[2021-11-29T21:49:51.547Z] 
[2021-11-29T21:49:51.547Z] DeleteTopicTest > 
testAddPartitionDuringDeleteTopic() STARTED
[2021-11-29T21:49:54.213Z] 
[2021-11-29T21:49:54.213Z] DeleteTopicTest > 
testAddPartitionDuringDeleteTopic() PASSED
[2021-11-29T21:49:54.213Z] 
[2021-11-29T21:49:54.213Z] DeleteTopicTest > 
testDeleteTopicWithAllAliveReplicas() STARTED
[2021-11-29T21:49:57.832Z] 
[2021-11-29T21:49:57.832Z] DeleteTopicTest > 
testDeleteTopicWithAllAliveReplicas() PASSED
[2021-11-29T21:49:57.832Z] 
[2021-11-29T21:49:57.832Z] DeleteTopicTest > 
testDeleteTopicDuringAddPartition() STARTED
[2021-11-29T21:50:03.642Z] 
[2021-11-29T21:50:03.642Z] DeleteTopicTest > 
testDeleteTopicDuringAddPartition() PASSED
[2021-11-29T21:50:03.642Z] 
[2021-11-29T21:50:03.642Z] DeleteTopicTest > 
testDeletingPartiallyDeletedTopic() STARTED
[2021-11-29T21:50:12.140Z] 
[2021-11-29T21:50:12.140Z] DeleteTopicTest > 
testDeletingPartiallyDeletedTopic() PASSED
[2021-11-29T21:50:12.140Z] 
[2021-11-29T21:50:12.140Z] AddPartitionsTest > testReplicaPlacementAllServers() 
STARTED
[2021-11-29T21:50:15.759Z] 
[2021-11-29T21:50:15.759Z] AddPartitionsTest > testReplicaPlacementAllServers() 
PASSED
[2021-11-29T21:50:15.759Z] 
[2021-11-29T21:50:15.759Z] AddPartitionsTest > testMissingPartition0() STARTED
[2021-11-29T21:50:20.421Z] 
[2021-11-29T21:50:20.421Z] AddPartitionsTest > testMissingPartition0() PASSED
[2021-11-29T21:50:20.421Z] 
[2021-11-29T21:50:20.421Z] AddPartitionsTest > testWrongReplicaCount() STARTED
[2021-11-29T21:50:24.038Z] 
[2021-11-29T21:50:24.038Z] AddPartitionsTest > testWrongReplicaCount() PASSED
[2021-11-29T21:50:24.038Z] 
[2021-11-29T21:50:24.038Z] AddPartitionsTest > 
testReplicaPlacementPartialServers() 

Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.0 #157

2021-11-29 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-778 KRaft Upgrades

2021-11-29 Thread David Arthur
Jun, I updated the KIP with the "disable" CLI.

For 16, I think you're asking how we can safely introduce the
initial version of other feature flags in the future. This will probably
depend on the nature of the feature that the new flag is gating. We can
probably take a similar approach and say version 1 is backwards compatible
to some point (possibly an IBP or metadata.version?).

-David

On Fri, Nov 19, 2021 at 3:00 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the reply. The new CLI sounds reasonable to me.
>
> 16.
> For case C, choosing the latest version sounds good to me.
> For case B, for metadata.version, we pick version 1 since it just happens
> that for metadata.version version 1 is backward compatible. How do we make
> this more general for other features?
>
> 21. Do you still plan to change "delete" to "disable" in the CLI?
>
> Thanks,
>
> Jun
>
>
>
> On Thu, Nov 18, 2021 at 11:50 AM David Arthur
>  wrote:
>
> > Jun,
> >
> > The KIP has some changes to the CLI for KIP-584. With Jason's suggestion
> > incorporated, these three commands would look like:
> >
> > 1) kafka-features.sh upgrade --release latest
> > upgrades all known features to their defaults in the latest release
> >
> > 2) kafka-features.sh downgrade --release 3.x
> > downgrade all known features to the default versions from 3.x
> >
> > 3) kafka-features.sh describe --release latest
> > Describe the known features of the latest release
> >
> > The --upgrade-all and --downgrade-all are replaced by specific each
> > feature+version or giving the --release argument. One problem with
> > --downgrade-all is it's not clear what the target versions should be: the
> > previous version before the last upgrade -- or the lowest supported
> > version. Since downgrades will be less common, I think it's better to
> make
> > the operator be more explicit about the desired downgrade version (either
> > through [--feature X --version Y] or [--release 3.1]). Does that seem
> > reasonable?
> >
> > 16. For case C, I think we will want to always use the latest
> > metadata.version for new clusters (like we do for IBP). We can always
> > change what we mean by "default" down the road. Also, this will likely
> > become a bit of information we include in release and upgrade notes with
> > each release.
> >
> > -David
> >
> > On Thu, Nov 18, 2021 at 1:14 PM Jun Rao 
> wrote:
> >
> > > Hi, Jason, David,
> > >
> > > Just to clarify on the interaction with the end user, the design in
> > KIP-584
> > > allows one to do the following.
> > > (1) kafka-features.sh  --upgrade-all --bootstrap-server
> > > kafka-broker0.prn1:9071 to upgrade all features to the latest version
> > known
> > > by the tool. The user doesn't need to know a specific feature version.
> > > (2) kafka-features.sh  --downgrade-all --bootstrap-server
> > > kafka-broker0.prn1:9071 to downgrade all features to the latest version
> > > known by the tool. The user doesn't need to know a specific feature
> > > version.
> > > (3) kafka-features.sh  --describe --bootstrap-server
> > > kafka-broker0.prn1:9071 to find out the supported version for each
> > feature.
> > > This allows the user to upgrade each feature individually.
> > >
> > > Most users will be doing (1) and occasionally (2), and won't need to
> know
> > > the exact version of each feature.
> > >
> > > 16. For case C, what's the default version? Is it always the latest?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Nov 18, 2021 at 8:15 AM David Arthur
> > >  wrote:
> > >
> > > > Colin, thanks for the detailed response. I understand what you're
> > saying
> > > > and I agree with your rationale.
> > > >
> > > > It seems like we could just initialize their cluster.metadata to 1
> when
> > > the
> > > > > software is upgraded to 3.2.
> > > > >
> > > >
> > > > Concretely, this means the controller would see that there is no
> > > > FeatureLevelRecord in the log, and so it would bootstrap one.
> Normally
> > > for
> > > > cluster initialization, this would be read from meta.properties, but
> in
> > > the
> > > > case of preview clusters we would need to special case it to
> initialize
> > > the
> > > > version to 1.
> > > >
> > > > Once the new FeatureLevelRecord has been committed, any nodes
> (brokers
> > or
> > > > controllers) that are running the latest software will react to the
> new
> > > > metadata.version. This means we will need to make this initial
> version
> > > of 1
> > > > be backwards compatible to what we have in 3.0 and 3.1 (since some
> > > brokers
> > > > will be on the new software and some on older/preview versions)
> > > >
> > > > I agree that auto-upgrading preview users from IBP 3.0 to
> > > metadata.version
> > > > 1 (equivalent to IBP 3.2) is probably fine.
> > > >
> > > > Back to Jun's case B:
> > > >
> > > > b. We upgrade from an old version where no metadata.version has been
> > > > > finalized. In this case, it makes sense to leave metadata.version
> > > > disabled
> > > > > since we don't kno

Re: [DISCUSS] KIP-782: Expandable batch size in producer

2021-11-29 Thread Artem Livshits
Hi Luke,

I don't mind increasing the max.request.size to a higher number, e.g. 2MB
could be good.  I think we should also run some benchmarks to see the
effects of different sizes.

I agree that changing round robin to random solves an independent existing
issue, however the logic in this KIP exacerbates the issue, so there is
some dependency.

-Artem

On Wed, Nov 24, 2021 at 12:43 AM Luke Chen  wrote:

> Hi Artem,
> Yes, I agree if we go with random selection instead of round-robin
> selection, the latency issue will be more fair. That is, if there are 10
> partitions, the 10th partition will always be the last choice in each round
> in current design, but with random selection, the chance to be selected is
> more fair.
>
> However, I think that's kind of out of scope with this KIP. This is an
> existing issue, and it might need further discussion to decide if this
> change is necessary.
>
> I agree the default 32KB for "batch.max.size" might be not huge improvement
> compared with 256KB. I'm thinking, maybe default to "64KB" for
> "batch.max.size", and make the documentation clear that if the
> "batch.max.size"
> is increased, there might be chances that the "ready" partitions need to
> wait for next request to send to broker, because of the "max.request.size"
> (default 1MB) limitation. "max.request.size" can also be considered to
> increase to avoid this issue. What do you think?
>
> Thank you.
> Luke
>
> On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
>  wrote:
>
> > >  maybe I can firstly decrease the "batch.max.size" to 32KB
> >
> > I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> > produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s per
> > partition.  We should probably set up some testing and see if 256KB has
> > problems.
> >
> > To illustrate latency dynamics, let's consider a simplified model: 1
> > in-flight request per broker, produce latency 125ms, 256KB max request
> > size, 16 partitions assigned to the same broker, every second 128KB is
> > produced to each partition (total production rate is 2MB/sec).
> >
> > If the batch size is 16KB, then the pattern would be the following:
> >
> > 0ms - produce 128KB into each partition
> > 0ms - take 16KB from each partition send (total 256KB)
> > 125ms - complete first 16KB from each partition, send next 16KB
> > 250ms - complete second 16KB, send next 16KB
> > ...
> > 1000ms - complete 8th 16KB from each partition
> >
> > from this model it's easy to see that there are 256KB that are sent
> > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > 875ms.
> >
> > If the batch size is 256KB, then the pattern would be the following:
> >
> > 0ms - produce 128KB into each partition
> > 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> > 125ms - complete 2 first partitions, send data from next 2 partitions
> > ...
> > 1000ms - complete last 2 partitions
> >
> > even though the pattern is different, there are still 256KB that are sent
> > immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> > 875ms.
> >
> > Now, in this example if we do strictly round-robin (current
> implementation)
> > and we have this exact pattern (not sure how often such regular pattern
> > would happen in practice -- I would expect that it would be a bit more
> > random), some partitions would experience higher latency than others (not
> > sure how much it would matter in practice -- in the end of the day some
> > bytes produced to a topic would have higher latency and some bytes would
> > have lower latency).  This pattern is easily fixed by choosing the next
> > partition randomly instead of using round-robin.
> >
> > -Artem
> >
> > On Tue, Nov 23, 2021 at 12:08 AM Luke Chen  wrote:
> >
> > > Hi Tom,
> > > Thanks for your comments. And thanks for Artem's explanation.
> > > Below is my response:
> > >
> > > > Currently because buffers are allocated using batch.size it means we
> > can
> > > handle records that are that large (e.g. one big record per batch).
> > Doesn't
> > > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > > corresponding decrease in the maximum record size that the producer can
> > > handle?
> > >
> > > Actually, the "batch.size" is only like a threshold to decide if the
> > batch
> > > is "ready to be sent". That is, even if you set the "batch.size=16KB"
> > > (default value), users can still send one record sized with 20KB, as
> long
> > > as the size is less than "max.request.size" in producer (default 1MB).
> > > Therefore, the introduction of "batch.initial.size" won't decrease the
> > > maximum record size that the producer can handle.
> > >
> > > > But isn't there the risk that drainBatchesForOneNode would end up not
> > > sending ready
> > > batches well past when they ought to be sent (according to their
> > linger.ms
> > > ),
> > > because it's sending buffers for earlier partitions too aggressively?
> > >
> > > Did you mean that we ha

Re: KIP-769: Connect API to retrieve connector configuration definitions

2021-11-29 Thread Chris Egerton
Hi Mickael,

I think that's a great idea! I especially like how we can establish the
expectation that any plugin type that appears in the response from the GET
/connector-plugins endpoint will have a corresponding GET
/connector-plugins//config endpoint, but (if we decide to add them in
the future), worker plugins won't be expected to expose this kind of
information and the different root path helps give a decent hint about this.

I also like the choice to return an empty ConfigDef from Converter::config
instead of null.

Two things come to mind:

1. We may want to gate this behind a URL query parameter (maybe something
like "connectorsOnly") that defaults to the old behavior in order to avoid
breaking existing tools such as programmatic UIs that use the endpoint
today to discover the connectors that can be created by the user. We can
even plan to change the default for that parameter to the newly-proposed
behavior in the next major release, which should give people enough time to
either adapt to the expanded response format or add the query parameter to
their tooling.

2. The existing GET /connector-plugins endpoint doesn't contain information
on the location of the plugin on the worker's file system. Do you think we
should still include this info in the new response format? Correct me if
I'm wrong but it seems it may have been proposed originally to help prevent
already-addressed bugs in Connect classloading from striking; all else
equal, I'd personally err on the side of leaving this info out or at least
reducing permitted values for it to just "classpath" or "plugin path" in
order to avoid leaking worker file paths into the REST API, which might
bother super security-conscious users.

Cheers,

Chris

On Mon, Nov 29, 2021 at 5:52 AM Mickael Maison 
wrote:

> Hi Chris,
>
> Yes to keep compatibility we want a default implementation for
> Converter.configs(), I've updated the KIP.
>
> Regarding worker plugins, the use case you described seems valuable.
> I'd prefer not mixing worker and connector plugins on the same
> endpoint but I agree using /plugins and /worker-plugins could be
> confusing.
>
> One alternative is to expose all connector-level plugins via the
> existing /connector-plugins endpoint. In that case, we'd need to keep
> the current JSON schema and not group plugins by type. As the current
> schema already has a type field for each entry, we'll still be able to
> tell them apart. Then we can have /worker-plugins and a relatively
> clean API. What do you think?
>
> Thanks,
> Mickael
>
> On Sun, Nov 28, 2021 at 8:21 PM Chris Egerton
>  wrote:
> >
> > Hi Mickael,
> >
> > I think one potential use case for exposing worker-level plugins is that
> it
> > may make it easier to determine whether a worker is set up correctly (the
> > current alternative requires looking through log files and can be a
> little
> > tedious), and might even make it possible to automatically identify
> > discrepancies within a cluster by diffing the contents of that endpoint
> > across each worker. But I don't think this has to be addressed by the
> > current KIP; the only thing that bothers me a little is that "plugins" is
> > generic and it may confuse people down the road if we add an endpoint for
> > worker-level plugins ("why is one just called 'plugins' and the other one
> > is 'worker-plugins'?"). Probably not worth blocking on, though.
> >
> > Agreed that the suggestion for improved validation should be made on the
> > KIP-802 thread.
> >
> > I also noticed that the newly-proposed config method for the Converter
> > interface doesn't have a default implementation, making it
> > backwards-incompatible. Should we add a default implementation that
> returns
> > either null or an empty ConfigDef?
> >
> > Cheers,
> >
> > Chris
> >
> > On Fri, Nov 26, 2021 at 8:35 AM Mickael Maison  >
> > wrote:
> >
> > > Hi Chris,
> > >
> > > 1. If we want to expose worker plugins, I think we should do it via a
> > > separate endpoint. But to be honest, I'm not even sure I see strong
> > > use cases for exposing them as they are either enabled or not and
> > > can't be changed at runtime. So I'd prefer to stick to "connector
> > > level" plugins in this KIP. Let me now if you have use cases, I'm open
> > > to reconsider this choice.
> > > I'll add that in the rejected alternatives section for now
> > >
> > > 2. I remembered seeing issues in the past with multiple plugin.path
> > > entries but I tried today and I was able to mix and match plugins from
> > > different paths. So my bad for getting confused.
> > > Then I agree, it makes more sense to group them by plugin type.
> > >
> > > 3. Yes this should be covered in KIP-802:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-802%3A+Validation+Support+for+Kafka+Connect+SMT+Options
> > >
> > > 4. No particular reason. We can support both formats like today. I've
> > > updated the KIP
> > >
> > > Thanks,
> > > Mickael
> > >
> > >
> > >
> > > On Tue, Nov 23, 2021 at 6:40 

Re: [VOTE] KIP-799: Align behaviour for producer callbacks with documented behaviour

2021-11-29 Thread John Roesler
Thanks, Séamus!

I'm +1 (binding).

On Mon, 2021-11-29 at 16:14 +, Séamus Ó Ceanainn wrote:
> Hi everyone,
> 
> I'd like to start a vote for KIP-799: Align behaviour for producer
> callbacks with documented behaviour
> 
> .
> 
> The KIP proposes a breaking change in the behaviour of producer client
> callbacks. The breaking change would align the behaviour of callbacks with
> the documented behaviour for the method, and makes it consistent with
> similar methods for producer client interceptors.
> 
> Regards,
> Séamus.



Re: [DISCUSS] KIP-805: Add range and scan query support in IQ v2

2021-11-29 Thread John Roesler
Thanks for the KIP, Vicky!

This KIP will help fill in the parity gap between IQ and
IQv2.

One thing I noticed, which looks like just a typo is that
the value type of the proposed RangeQuery should probably be
KeyValueIterator, right?

Otherwise, it looks good to me!

Thanks,
-John

On Mon, 2021-11-29 at 12:20 +, Vasiliki Papavasileiou
wrote:
> Hello everyone,
> 
> I would like to start the discussion for KIP-805: Add range and scan query
> support in IQ v2
> 
> The KIP can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+support+in+IQ+v2
> 
> Any suggestions are more than welcome.
> 
> Many thanks,
> Vicky