Re: [DISCUSS] KIP-1043: Administration of groups

2024-07-19 Thread Andrew Schofield
Hi Apoorv,
Thanks for your comments.

AM1: I chose to leave the majority of the administration for the different
types of groups in their own tools. The differences between the group
types are significant and I think that one uber tool that subsumes
kafka-consumer-groups.sh, kafka-share-groups.sh and
kafka-streams-application-reset.sh would be too overwhelming and
difficult to use. For example, the output from describing a consumer group
is not the same as the output from describing a share group.

AM2: I think you’re highlighting some of the effects of the evolution
of groups. The classic consumer group protocol defined the idea
of protocol as a way of distinguishing between the various ways people
had extended the base protocol - “consumer", “connect", and “sr" are the
main ones I’ve seen, and the special “” for groups that are not using
member assignment.

For the modern group protocol, each of the proposed implementations
brings its own use of the protocol string - “consumer”, “share” and “streams”.

Now, prior to AK 4.0, in order to make the console consumer use the
new group protocol, you set `--consumer-property group.protocol=consumer`.
This tells a factory method in the consumer to use the AsyncKafkaConsumer
(group type is Consumer, protocol is “consumer") as opposed to the
LegacyKafkaConsumer (group type is Classic, protocol is “consumer”).
In AK 4.0, the default group protocol will change and setting the property
will not be necessary. The name of the configuration “group.protocol”
is slightly misleading. In practice, this is most likely to be used pre-AK 4.0
by people wanting to try out the new consumer.

AM3: When you try to create a share group and that group ID is already
in use by another type of group, the error message is “Group CG1 is not
a share group”. It exists already, with the wrong type.

AM4: This KIP changes the error behaviour for `kafka-consumer-groups.sh`
and `kafka-share-groups.sh` such that any operation on a group that finds the
group type is incorrect reports “Error: Group XXX is not a consumer group” or
equivalent for the other group types. This change makes things much easier to
understand than they are today.

AM5: That section is just clarifying what the behaviour is. I don’t think it had
been written down before.

Thanks,
Andrew

> On 18 Jul 2024, at 16:43, Apoorv Mittal  wrote:
>
> Hi Andrew,
> Thanks for the KIP. The group administration is getting difficult with new
> types of groups being added and certainly the proposal looks great.
> I have some questions:
>
> AM1: The current proposal defines the behaviour for listing and describing
> groups, simplifying create for `kafka-share-groups.sh`. Do we want to leave
> the other group administration like delete to respective groups or shall
> have common behaviour defined i.e. leave to respective
> kafka-consumer-groups.sh or kafka-share-groups.sh?
>
> AM2: Reading the notes on KIP-848,
> https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes,
> which requires `--consumer-property group.protocol=consumer` to enable
> modern consumer group. But the listing for `classic` "type" also defines
> "protocol" as `consumer` in some scenarios. Is it intended or `classic`
> type should different protocol?
>
> AM3: The KIP adds behaviour for  `kafka-share-groups.sh` which defines the
> `--create` option. Though it simplifies group creation, what should be the
> error behaviour when the group with the same name exists but not of "share"
> group type?
>
> AM4: The GroupMetadataManager.java stores all groups in the same data
> structure which means the name has to be unique across different group
> types. Do you think we should also change the error message for existing
> kafka-consumer-groups.sh and kafka-share-groups.sh to recommend using new
> kafka-groups.sh for extensive groups list? As currently the individual
> scripts will result in "Group already exists" while creating new groups but
> listing with respective utility will not yield the group.
>
> AM5: The KIP defines compatibility considerations for the ListGroups RPC.
> But it's unclear to me why it's needed as the client and server supporting
> `kafka-groups.sh` will be post ListGroups v5 API anyways hence TypesFilter
> will be supported in both client and server. Am I missing something here?
>
> Regards,
> Apoorv Mittal
> +44 7721681581
>
>
> On Wed, Jul 17, 2024 at 6:26 PM Andrew Schofield 
> wrote:
>
>> Hi Lianet,
>> Thanks for your comments.
>>
>> LM5. Unfortunately, the protocol type has to be a string rather than
>> an enumeration. This is because when people have created their own
>> extensions of the classic consumer group protocol, they have chosen
>> their own protocol strings. For example, the Confluent schema registry
>> uses “sr” and there are other examples in the wild.
>>
>> LM6.1. It’s because of the difference between a parameterised
>> type and a raw type.
>>
>> I

[jira] [Resolved] (KAFKA-17121) junit-platform.properties files in published artifacts pollute the test classpath of consumers

2024-07-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17121.

Fix Version/s: 3.9.0
   Resolution: Fixed

> junit-platform.properties files in published artifacts pollute the test 
> classpath of consumers
> --
>
> Key: KAFKA-17121
> URL: https://issues.apache.org/jira/browse/KAFKA-17121
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 3.7.1
>Reporter: Andy Wilkinson
>Assignee: kangning.li
>Priority: Major
> Fix For: 3.9.0
>
>
> https://github.com/apache/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d
>  added {{junit-platform.properties}} files to the test-classified 
> kafka-clients and kafka-server-common artifacts. When a consumer is using 
> these artifacts for its own Kafka-related tests, a warning is logged when the 
> tests are launched:
> {code}
> Jul 11, 2024 10:26:21 AM 
> org.junit.platform.launcher.core.LauncherConfigurationParameters 
> loadClasspathResource
> WARNING: Discovered 2 'junit-platform.properties' configuration files in the 
> classpath; only the first will be used.
> {code}
> The fact that only the first will be used is potentially problematic. 
> Depending on the ordering of the classpath, it may result in a consumer's own 
> properties being ignored and Kafka's being used instead.
> Can the {{junit-platform.properties}} files be removed from the published 
> artifacts? As far as I can tell, they were only intended to affect Kafka's 
> own tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17132) Revisit testMissingOffsetNoResetPolicy for AsyncConsumer

2024-07-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17132.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Revisit testMissingOffsetNoResetPolicy for AsyncConsumer
> 
>
> Key: KAFKA-17132
> URL: https://issues.apache.org/jira/browse/KAFKA-17132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Major
> Fix For: 3.9.0
>
>
> `testMissingOffsetNoResetPolicy` assumes `poll(0)` can complete the request 
> in time, but that is not in spec. Hence, we should increase the timeout to 
> enable it for AsyncConsumer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol

2024-07-19 Thread Lucas Brutschy
Hi Andrew,

thanks for getting the discussion going! Here are my responses.

AS1: Good point, done.

AS2: We were planning to add more administrative tools to the
interface in a follow-up KIP, to not make this KIP too large. If
people think that it would help to understand the overall picture if
we already add something like `kafka-streams-groups.sh`, we will do
that. I also agree that we should address how this relates to
KIP-1043, we'll add it.

AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc.

AS4: Thanks, Fixed.

AS5: Good catch. This was supposed to mean that we require CREATE on
cluster or CREATE on all topics, not both. Fixed.

AS6: Thanks, Fixed.

AS7. Thanks, Fixed.

AS8: I think this works a bit different in this KIP than in consumer
groups. KIP-848 lets the members vote for a preferred assignor, and
the broker-side assignor is picked by majority vote. The
`group.consumer.assignors` specifies the list of assignors that are
supported on the broker, and is configurable because the interface is
pluggable. In this KIP, the task assignor is not voted on by members
but configured on the broker-side. `group.streams.assignor` is used
for this, and uses a specific name. If we'll make the task assignor
pluggable on the broker-side, we'd introduce a separate config
`group.streams.assignors`, which would indeed be a list of class
names. I think there is no conflict here, the two configurations serve
different purposes.  The only gripe I'd have here is naming as
`group.streams.assignor` and `group.streams.assignors` would be a bit
similar, but I cannot really think of a better name for
`group.streams.assignor`, so I'd probably rather introduce
`group.streams.assignors`  as `group.streams.possible_assignors`  or
something like that.

AS9: I added explanations for the various record types. Apart from the
new topology record, and the partition metadata (which is based on the
topology and can only be created once we have a topology initialized)
the lifecycle for the records is basically identical as in KIP-848.

AS10: In the consumer offset topic, the version in the key is used to
differentiate different schema types with the same content. So the
keys are not versioned, but the version field is "abused" as a type
tag. This is the same in KIP-848, we followed it for consistency.

Cheers,
Lucas


On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
 wrote:
>
> Hi Lucas and Bruno,
>
> Thanks for the great KIP.
>
> I've read through the document and have some initial comments.
>
> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration
> constant. This is a change to the public interface and should be called out.
>
> AS2: Since streams groups are no longer consumer groups, how does the user
> manipulate them, observe lag and so on? Will you add `kafka-streams-groups.sh`
> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can easily
> be extended to support streams groups, but that only lets the user see the
> groups, not manipulate them.
>
> AS3: I wonder whether the streams group state of UNINITIALIZED would be
> better expressed as INITIALIZING.
>
> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should
> be nullable.
>
> AS5: Why does StreamsGroupInitialize require CREATE permission on the
> cluster resource? I imagine that this is one of the ways that the request 
> might
> be granted permission to create the StateChangelogTopics and
> RepartitionSourceTopics, but if it is granted permission to create those 
> topics
> with specific ACLs, would CREATE on the cluster resource still be required?
>
> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED
> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>
> AS7: A tiny nit. You've used TopologyID (capitals) in 
> StreamsGroupHeartbeatRequest
> and a few others, but in all other cases the fields which are ids are spelled 
> Id.
> I suggest TopologyId.
>
> Also, "interal" is probably meant to be "interval”.
>
> AS8: For consumer groups, the `group.consumer.assignors` configuration is a
> list of class names. The assignors do have names too, but the configuration 
> which
> enables them is in terms of class names. I wonder whether the broker’s
> group.streams.assignor could actually be `group.streams.assignors` and 
> specified
> as a list of the class names of the supplied assignors. I know you're not 
> supporting
> other assignors yet, but when you do, I expect you would prefer to have used 
> class
> names from the start.
>
> The use of assignor names in the other places looks good to me.
>
> AS9: I'd find it really helpful to have a bit of a description about the 
> purpose and
> lifecycle of the 9 record types you've introduced on the __consumer_offsets 
> topic.
> I did a cursory review but without really understanding what's written when,
> I can't do a thorough job of reviewing.
>
> AS10: In the definitions of the record keys, such as
> StreamsGroupCurren

[jira] [Resolved] (KAFKA-12877) Fix KRPC files with missing flexibleVersions annotation

2024-07-19 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12877.

Fix Version/s: 3.0.0
   Resolution: Fixed

> Fix KRPC files with missing flexibleVersions annotation
> ---
>
> Key: KAFKA-12877
> URL: https://issues.apache.org/jira/browse/KAFKA-12877
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.0.0
>
>
> Some KRPC files do not specify their flexibleVersions. Unfortunately, in this 
> case, we default to not supporting any flexible versions. This is a poor 
> default, since the flexible format is both more efficient (usually) and 
> flexible.
> Make flexibleVersions explicit and disallow setting anything except "0+" on 
> new RPC and metadata records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1068: KIP-1068: New JMX Metrics for AsyncKafkaConsumer

2024-07-19 Thread Apoorv Mittal
Hi Brendan,
Thanks for the KIP. The metrics are always helpful.

AM1: Is `application-event-queue-age-avg` enough or do we require `
application-event-queue-age-max` as well to differentiate with outliers?

AM2: The kafka producer defines metric `record-queue-time-avg` which
captures the time spent in the buffer. Do you think we should have a
similar name for `application-event-queue-age-avg` i.e. change to `
application-event-queue-time-avg`? Moreover other than similar naming,
`time` anyways seems more suitable than `age`, though minor. The `time`
usage is also aligned with the description of this metric.

AM3: Metric `application-event-processing-time` says "the average time,
that the consumer network.". Shall we have the `-avg` suffix in the
metric as we have defined for other metrics? Also do we require the max
metric as well for the same?

AM4: Is the telemetry name for `unsent-requests-queue-size` intended
as `org.apache.kafka.consumer.unsent.requests.size`,
or it should be corrected to `
org.apache.kafka.consumer.unsent.requests.queue.size`?

AM2:
Regards,
Apoorv Mittal
+44 7721681581


On Mon, Jul 15, 2024 at 2:45 PM Andrew Schofield 
wrote:

> Hi Brenden,
> Thanks for the updates.
>
> AS4. I see that you’ve added `.ms` to a bunch of the metrics reflecting the
> fact that they’re measured in milliseconds. However, I observe that most
> metrics
> in Kafka that are measured in milliseconds, with some exceptions in Kafka
> Connect
> and MirrorMaker do not follow this convention. I would tend to err on the
> side of
> consistency with the existing metrics and not use `.ms`. However, that’s
> just my
> opinion, so I’d be interested to know what other reviewers of the KIP
> think.
>
> Thanks,
> Andrew
>
> > On 12 Jul 2024, at 20:11, Brenden Deluna 
> wrote:
> >
> > Hey Lianet,
> >
> > Thank you for your suggestions and feedback!
> >
> >
> > LM1. This has now been addressed.
> >
> >
> > LM2. I think that would be a valuable addition to the current set of
> > metrics, I will get that added.
> >
> >
> > LM3. Again great idea, that would certainly be helpful. Will add that as
> > well.
> >
> >
> > Let me know if you have any more suggestions!
> >
> >
> > Thanks,
> >
> > Brenden
> >
> > On Fri, Jul 12, 2024 at 2:11 PM Brenden Deluna 
> wrote:
> >
> >> Hi Lucas,
> >>
> >> Thank you for the feedback! I have addressed your comments:
> >>
> >>
> >> LB1. Good catch there, I will update the names as needed.
> >>
> >>
> >> LB2. Good catch again! I will update the name to be more consistent.
> >>
> >>
> >> LB3. Thank you for pointing this out, I realized that all metric values
> >> will actually be set to 0. I will specifiy this and explain why they
> will
> >> be 0.
> >>
> >>
> >> Nit: This metric is referring to the queue of unsent requests in the
> >> NetworkClientDelegate. For the metric descriptions I am trying to not
> >> include too much of the implementation details, hence the reason that
> >> description is quite short. I cannot think of other ways to describe the
> >> metric without going deeper into the implementation, but please do let
> me
> >> know if you have any ideas.
> >>
> >>
> >> Thank you,
> >>
> >> Brenden
> >>
> >> On Fri, Jul 12, 2024 at 1:27 PM Lianet M.  wrote:
> >>
> >>> Hey Brenden, thanks for the KIP! Great to get more visibility into the
> new
> >>> consumer.
> >>>
> >>> LM1. +1 on Lucas's suggestion for including the unit in the name, seems
> >>> clearer and consistent (I do see several time metrics including ms)
> >>>
> >>> LM2. What about a new metric for application-event-queue-time-ms. It
> would
> >>> be a complement to the application-event-queue-size you're proposing,
> and
> >>> it will tell us how long the events sit in the queue, waiting to be
> >>> processed (from the time the API call adds the event to the queue, to
> the
> >>> time it's processed in the background thread). I find it would be very
> >>> interesting.
> >>>
> >>> LM3. Thinking about the actual usage of
> >>> "time-between-network-thread-poll-xxx" metric, I imagine it would be
> >>> helpful to know more about what could be impacting it. As I see it, the
> >>> network thread cadence could be mainly impacted by: 1- app event
> >>> processing
> >>> (generate requests), 2- network client poll (actual send/receive). For
> 2,
> >>> the new consumer reuses the same component as the legacy one, but 1 is
> >>> specific to the new consumer, so what about a metric
> >>> for application-event-processing-time-ms (we could consider avg I would
> >>> say). It would be the time that the network thread takes to process all
> >>> available events on each run.
> >>>
> >>> Cheers!
> >>> Lianet
> >>>
> >>> On Fri, Jul 12, 2024 at 1:57 PM Lucas Brutschy
> >>>  wrote:
> >>>
>  Hey Brenden,
> 
>  thanks for the KIP! These will be great to observe and debug the
>  background thread of the new consumer.
> 
>  LB1. `time-between-network-thread-poll-max` → I see several similar
>  metrics including the uni

Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol

2024-07-19 Thread Andrew Schofield
Hi Lucas,
Thanks for your response. All makes sense for me, with just a couple
of follow-up comments.

AS8: So, really the broker config is the name of the default assignor
used unless it’s overridden by a group config. I have one suggestion,
which you can of course ignore, that you use
`group.streams.default.assignor.name` on the broker,
`group.streams.assignor.name` on the group,
`group.remote.assignor.name` on the application, and when you permit
pluggable assignors, you use `group.streams.assignors` for the list of
class names.

I’d prefer if we’d used `group.*.assignor.class.names` for all group types
but that ship has sailed.

AS10: Yes, exactly. In practice, you’ll assign the numbers as you implement
the KIP. Looks good to me.


I’ll take another look now the records are more fully described.

Thanks,
Andrew

> On 19 Jul 2024, at 13:46, Lucas Brutschy  
> wrote:
>
> Hi Andrew,
>
> thanks for getting the discussion going! Here are my responses.
>
> AS1: Good point, done.
>
> AS2: We were planning to add more administrative tools to the
> interface in a follow-up KIP, to not make this KIP too large. If
> people think that it would help to understand the overall picture if
> we already add something like `kafka-streams-groups.sh`, we will do
> that. I also agree that we should address how this relates to
> KIP-1043, we'll add it.
>
> AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc.
>
> AS4: Thanks, Fixed.
>
> AS5: Good catch. This was supposed to mean that we require CREATE on
> cluster or CREATE on all topics, not both. Fixed.
>
> AS6: Thanks, Fixed.
>
> AS7. Thanks, Fixed.
>
> AS8: I think this works a bit different in this KIP than in consumer
> groups. KIP-848 lets the members vote for a preferred assignor, and
> the broker-side assignor is picked by majority vote. The
> `group.consumer.assignors` specifies the list of assignors that are
> supported on the broker, and is configurable because the interface is
> pluggable. In this KIP, the task assignor is not voted on by members
> but configured on the broker-side. `group.streams.assignor` is used
> for this, and uses a specific name. If we'll make the task assignor
> pluggable on the broker-side, we'd introduce a separate config
> `group.streams.assignors`, which would indeed be a list of class
> names. I think there is no conflict here, the two configurations serve
> different purposes.  The only gripe I'd have here is naming as
> `group.streams.assignor` and `group.streams.assignors` would be a bit
> similar, but I cannot really think of a better name for
> `group.streams.assignor`, so I'd probably rather introduce
> `group.streams.assignors`  as `group.streams.possible_assignors`  or
> something like that.
>
> AS9: I added explanations for the various record types. Apart from the
> new topology record, and the partition metadata (which is based on the
> topology and can only be created once we have a topology initialized)
> the lifecycle for the records is basically identical as in KIP-848.
>
> AS10: In the consumer offset topic, the version in the key is used to
> differentiate different schema types with the same content. So the
> keys are not versioned, but the version field is "abused" as a type
> tag. This is the same in KIP-848, we followed it for consistency.
>
> Cheers,
> Lucas
>
>
> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
>  wrote:
>>
>> Hi Lucas and Bruno,
>>
>> Thanks for the great KIP.
>>
>> I've read through the document and have some initial comments.
>>
>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration
>> constant. This is a change to the public interface and should be called out.
>>
>> AS2: Since streams groups are no longer consumer groups, how does the user
>> manipulate them, observe lag and so on? Will you add 
>> `kafka-streams-groups.sh`
>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can 
>> easily
>> be extended to support streams groups, but that only lets the user see the
>> groups, not manipulate them.
>>
>> AS3: I wonder whether the streams group state of UNINITIALIZED would be
>> better expressed as INITIALIZING.
>>
>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should
>> be nullable.
>>
>> AS5: Why does StreamsGroupInitialize require CREATE permission on the
>> cluster resource? I imagine that this is one of the ways that the request 
>> might
>> be granted permission to create the StateChangelogTopics and
>> RepartitionSourceTopics, but if it is granted permission to create those 
>> topics
>> with specific ACLs, would CREATE on the cluster resource still be required?
>>
>> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED
>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>>
>> AS7: A tiny nit. You've used TopologyID (capitals) in 
>> StreamsGroupHeartbeatRequest
>> and a few others, but in all other cases the fields which are ids are 
>> spelled Id.
>> I suggest TopologyId

Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol

2024-07-19 Thread Andrew Schofield
Hi Lucas,
I see that I hit send too quickly. One more comment:

AS2: I think stating that there will be a `kafka-streams-group.sh` in a
future KIP is fine to keep this KIP focused. Personally, I would probably
put all of the gory details in this KIP, but then it’s not my KIP. A future
pointer is fine too.

Thanks,
Andrew


> On 19 Jul 2024, at 13:46, Lucas Brutschy  
> wrote:
>
> Hi Andrew,
>
> thanks for getting the discussion going! Here are my responses.
>
> AS1: Good point, done.
>
> AS2: We were planning to add more administrative tools to the
> interface in a follow-up KIP, to not make this KIP too large. If
> people think that it would help to understand the overall picture if
> we already add something like `kafka-streams-groups.sh`, we will do
> that. I also agree that we should address how this relates to
> KIP-1043, we'll add it.
>
> AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc.
>
> AS4: Thanks, Fixed.
>
> AS5: Good catch. This was supposed to mean that we require CREATE on
> cluster or CREATE on all topics, not both. Fixed.
>
> AS6: Thanks, Fixed.
>
> AS7. Thanks, Fixed.
>
> AS8: I think this works a bit different in this KIP than in consumer
> groups. KIP-848 lets the members vote for a preferred assignor, and
> the broker-side assignor is picked by majority vote. The
> `group.consumer.assignors` specifies the list of assignors that are
> supported on the broker, and is configurable because the interface is
> pluggable. In this KIP, the task assignor is not voted on by members
> but configured on the broker-side. `group.streams.assignor` is used
> for this, and uses a specific name. If we'll make the task assignor
> pluggable on the broker-side, we'd introduce a separate config
> `group.streams.assignors`, which would indeed be a list of class
> names. I think there is no conflict here, the two configurations serve
> different purposes.  The only gripe I'd have here is naming as
> `group.streams.assignor` and `group.streams.assignors` would be a bit
> similar, but I cannot really think of a better name for
> `group.streams.assignor`, so I'd probably rather introduce
> `group.streams.assignors`  as `group.streams.possible_assignors`  or
> something like that.
>
> AS9: I added explanations for the various record types. Apart from the
> new topology record, and the partition metadata (which is based on the
> topology and can only be created once we have a topology initialized)
> the lifecycle for the records is basically identical as in KIP-848.
>
> AS10: In the consumer offset topic, the version in the key is used to
> differentiate different schema types with the same content. So the
> keys are not versioned, but the version field is "abused" as a type
> tag. This is the same in KIP-848, we followed it for consistency.
>
> Cheers,
> Lucas
>
>
> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
>  wrote:
>>
>> Hi Lucas and Bruno,
>>
>> Thanks for the great KIP.
>>
>> I've read through the document and have some initial comments.
>>
>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration
>> constant. This is a change to the public interface and should be called out.
>>
>> AS2: Since streams groups are no longer consumer groups, how does the user
>> manipulate them, observe lag and so on? Will you add 
>> `kafka-streams-groups.sh`
>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can 
>> easily
>> be extended to support streams groups, but that only lets the user see the
>> groups, not manipulate them.
>>
>> AS3: I wonder whether the streams group state of UNINITIALIZED would be
>> better expressed as INITIALIZING.
>>
>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should
>> be nullable.
>>
>> AS5: Why does StreamsGroupInitialize require CREATE permission on the
>> cluster resource? I imagine that this is one of the ways that the request 
>> might
>> be granted permission to create the StateChangelogTopics and
>> RepartitionSourceTopics, but if it is granted permission to create those 
>> topics
>> with specific ACLs, would CREATE on the cluster resource still be required?
>>
>> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED
>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>>
>> AS7: A tiny nit. You've used TopologyID (capitals) in 
>> StreamsGroupHeartbeatRequest
>> and a few others, but in all other cases the fields which are ids are 
>> spelled Id.
>> I suggest TopologyId.
>>
>> Also, "interal" is probably meant to be "interval”.
>>
>> AS8: For consumer groups, the `group.consumer.assignors` configuration is a
>> list of class names. The assignors do have names too, but the configuration 
>> which
>> enables them is in terms of class names. I wonder whether the broker’s
>> group.streams.assignor could actually be `group.streams.assignors` and 
>> specified
>> as a list of the class names of the supplied assignors. I know you're not 
>> supporting
>> other assignors

Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol

2024-07-19 Thread Lucas Brutschy
Hi Andrew,

AS2: I added a note for now. If others feel strongly about it, we can
still add more administrative tools to the KIP - it should not change
the overall story significantly.

AS8: "streams.group.assignor.name" sounds good to me to distinguish
the config from class names. Not sure if I like the "default". To be
consistent, we'd then have to call it
`group.streams.default.session.timeout.ms` as well. I only added the
`.name` on both broker and group level for now.

AS10: Ah, I misread your comment, now I know what you meant. Good
point, fixed (by Bruno).

Cheers,
Lucas

On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
 wrote:
>
> Hi Lucas,
> I see that I hit send too quickly. One more comment:
>
> AS2: I think stating that there will be a `kafka-streams-group.sh` in a
> future KIP is fine to keep this KIP focused. Personally, I would probably
> put all of the gory details in this KIP, but then it’s not my KIP. A future
> pointer is fine too.
>
> Thanks,
> Andrew
>
>
> > On 19 Jul 2024, at 13:46, Lucas Brutschy  
> > wrote:
> >
> > Hi Andrew,
> >
> > thanks for getting the discussion going! Here are my responses.
> >
> > AS1: Good point, done.
> >
> > AS2: We were planning to add more administrative tools to the
> > interface in a follow-up KIP, to not make this KIP too large. If
> > people think that it would help to understand the overall picture if
> > we already add something like `kafka-streams-groups.sh`, we will do
> > that. I also agree that we should address how this relates to
> > KIP-1043, we'll add it.
> >
> > AS3: Good idea, that's more consistent with `assigning` and `reconciling` 
> > etc.
> >
> > AS4: Thanks, Fixed.
> >
> > AS5: Good catch. This was supposed to mean that we require CREATE on
> > cluster or CREATE on all topics, not both. Fixed.
> >
> > AS6: Thanks, Fixed.
> >
> > AS7. Thanks, Fixed.
> >
> > AS8: I think this works a bit different in this KIP than in consumer
> > groups. KIP-848 lets the members vote for a preferred assignor, and
> > the broker-side assignor is picked by majority vote. The
> > `group.consumer.assignors` specifies the list of assignors that are
> > supported on the broker, and is configurable because the interface is
> > pluggable. In this KIP, the task assignor is not voted on by members
> > but configured on the broker-side. `group.streams.assignor` is used
> > for this, and uses a specific name. If we'll make the task assignor
> > pluggable on the broker-side, we'd introduce a separate config
> > `group.streams.assignors`, which would indeed be a list of class
> > names. I think there is no conflict here, the two configurations serve
> > different purposes.  The only gripe I'd have here is naming as
> > `group.streams.assignor` and `group.streams.assignors` would be a bit
> > similar, but I cannot really think of a better name for
> > `group.streams.assignor`, so I'd probably rather introduce
> > `group.streams.assignors`  as `group.streams.possible_assignors`  or
> > something like that.
> >
> > AS9: I added explanations for the various record types. Apart from the
> > new topology record, and the partition metadata (which is based on the
> > topology and can only be created once we have a topology initialized)
> > the lifecycle for the records is basically identical as in KIP-848.
> >
> > AS10: In the consumer offset topic, the version in the key is used to
> > differentiate different schema types with the same content. So the
> > keys are not versioned, but the version field is "abused" as a type
> > tag. This is the same in KIP-848, we followed it for consistency.
> >
> > Cheers,
> > Lucas
> >
> >
> > On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
> >  wrote:
> >>
> >> Hi Lucas and Bruno,
> >>
> >> Thanks for the great KIP.
> >>
> >> I've read through the document and have some initial comments.
> >>
> >> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS 
> >> enumeration
> >> constant. This is a change to the public interface and should be called 
> >> out.
> >>
> >> AS2: Since streams groups are no longer consumer groups, how does the user
> >> manipulate them, observe lag and so on? Will you add 
> >> `kafka-streams-groups.sh`
> >> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can 
> >> easily
> >> be extended to support streams groups, but that only lets the user see the
> >> groups, not manipulate them.
> >>
> >> AS3: I wonder whether the streams group state of UNINITIALIZED would be
> >> better expressed as INITIALIZING.
> >>
> >> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should
> >> be nullable.
> >>
> >> AS5: Why does StreamsGroupInitialize require CREATE permission on the
> >> cluster resource? I imagine that this is one of the ways that the request 
> >> might
> >> be granted permission to create the StateChangelogTopics and
> >> RepartitionSourceTopics, but if it is granted permission to create those 
> >> topics
> >> with specific ACLs, would CREATE on the cluster reso

[jira] [Created] (KAFKA-17169) Add an EndpointsTest suite that test all of the public methods

2024-07-19 Thread Jira
José Armando García Sancio created KAFKA-17169:
--

 Summary: Add an EndpointsTest suite that test all of the public 
methods
 Key: KAFKA-17169
 URL: https://issues.apache.org/jira/browse/KAFKA-17169
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1068: KIP-1068: New JMX Metrics for AsyncKafkaConsumer

2024-07-19 Thread Brenden Deluna
Hi Apoorv,
Thank you for your comments, I will address each.

AM1. I can see the usefulness in also having an
'application-event-queue-age-max' to get an idea of outliers and how they
may be affecting the average metric. I will add that.

AM2. I agree with you there, I think 'time' is a better descriptor here
than 'age'. I will update those metric names as well.

AM3. Similar to above comments, I will change the name of that metric to be
more consistent. And I think a max metric would also be useful here, adding
that.

AM4. Yes, good catch there. Will update that as well.

Thank you,
Brenden

On Fri, Jul 19, 2024 at 8:14 AM Apoorv Mittal 
wrote:

> Hi Brendan,
> Thanks for the KIP. The metrics are always helpful.
>
> AM1: Is `application-event-queue-age-avg` enough or do we require `
> application-event-queue-age-max` as well to differentiate with outliers?
>
> AM2: The kafka producer defines metric `record-queue-time-avg` which
> captures the time spent in the buffer. Do you think we should have a
> similar name for `application-event-queue-age-avg` i.e. change to `
> application-event-queue-time-avg`? Moreover other than similar naming,
> `time` anyways seems more suitable than `age`, though minor. The `time`
> usage is also aligned with the description of this metric.
>
> AM3: Metric `application-event-processing-time` says "the average time,
> that the consumer network.". Shall we have the `-avg` suffix in the
> metric as we have defined for other metrics? Also do we require the max
> metric as well for the same?
>
> AM4: Is the telemetry name for `unsent-requests-queue-size` intended
> as `org.apache.kafka.consumer.unsent.requests.size`,
> or it should be corrected to `
> org.apache.kafka.consumer.unsent.requests.queue.size`?
>
> AM2:
> Regards,
> Apoorv Mittal
> +44 7721681581
>
>
> On Mon, Jul 15, 2024 at 2:45 PM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > Hi Brenden,
> > Thanks for the updates.
> >
> > AS4. I see that you’ve added `.ms` to a bunch of the metrics reflecting
> the
> > fact that they’re measured in milliseconds. However, I observe that most
> > metrics
> > in Kafka that are measured in milliseconds, with some exceptions in Kafka
> > Connect
> > and MirrorMaker do not follow this convention. I would tend to err on the
> > side of
> > consistency with the existing metrics and not use `.ms`. However, that’s
> > just my
> > opinion, so I’d be interested to know what other reviewers of the KIP
> > think.
> >
> > Thanks,
> > Andrew
> >
> > > On 12 Jul 2024, at 20:11, Brenden Deluna  >
> > wrote:
> > >
> > > Hey Lianet,
> > >
> > > Thank you for your suggestions and feedback!
> > >
> > >
> > > LM1. This has now been addressed.
> > >
> > >
> > > LM2. I think that would be a valuable addition to the current set of
> > > metrics, I will get that added.
> > >
> > >
> > > LM3. Again great idea, that would certainly be helpful. Will add that
> as
> > > well.
> > >
> > >
> > > Let me know if you have any more suggestions!
> > >
> > >
> > > Thanks,
> > >
> > > Brenden
> > >
> > > On Fri, Jul 12, 2024 at 2:11 PM Brenden Deluna 
> > wrote:
> > >
> > >> Hi Lucas,
> > >>
> > >> Thank you for the feedback! I have addressed your comments:
> > >>
> > >>
> > >> LB1. Good catch there, I will update the names as needed.
> > >>
> > >>
> > >> LB2. Good catch again! I will update the name to be more consistent.
> > >>
> > >>
> > >> LB3. Thank you for pointing this out, I realized that all metric
> values
> > >> will actually be set to 0. I will specifiy this and explain why they
> > will
> > >> be 0.
> > >>
> > >>
> > >> Nit: This metric is referring to the queue of unsent requests in the
> > >> NetworkClientDelegate. For the metric descriptions I am trying to not
> > >> include too much of the implementation details, hence the reason that
> > >> description is quite short. I cannot think of other ways to describe
> the
> > >> metric without going deeper into the implementation, but please do let
> > me
> > >> know if you have any ideas.
> > >>
> > >>
> > >> Thank you,
> > >>
> > >> Brenden
> > >>
> > >> On Fri, Jul 12, 2024 at 1:27 PM Lianet M.  wrote:
> > >>
> > >>> Hey Brenden, thanks for the KIP! Great to get more visibility into
> the
> > new
> > >>> consumer.
> > >>>
> > >>> LM1. +1 on Lucas's suggestion for including the unit in the name,
> seems
> > >>> clearer and consistent (I do see several time metrics including ms)
> > >>>
> > >>> LM2. What about a new metric for application-event-queue-time-ms. It
> > would
> > >>> be a complement to the application-event-queue-size you're proposing,
> > and
> > >>> it will tell us how long the events sit in the queue, waiting to be
> > >>> processed (from the time the API call adds the event to the queue, to
> > the
> > >>> time it's processed in the background thread). I find it would be
> very
> > >>> interesting.
> > >>>
> > >>> LM3. Thinking about the actual usage of
> > >>> "time-between-network-thread-poll-xxx" metric, I imagine it wou

Re: [DISCUSS] KIP-1059: Enable the Producer flush() method to clear the latest send() error

2024-07-19 Thread Matthias J. Sax

For catching client side errors this would work IMHO. I am ok with this.

We throw before we add the record to the batch. Very clean semantics 
which should also address the concern of "non-atomic tx"... The 
exception clearly indicates that the record was not added to the TX, and 
users can react to it accordingly.


We did discuss this idea previously, but did not have a good proposal to 
make it backward compatible. The newly proposed overload would address 
this issue of backward compatibility.


Of course, it might not make it easily extensible in the future for 
broker side errors, but it's unclear anyway right now, if we would even 
get to a solution for broker side errors or not -- so maybe it's ok to 
accept this and drop/ignore the broker side error question for now.




A small follow up thought/question: instead of using a boolean, would we 
actually want to make it a var-arg enum to allow users to enable this 
for certain errors explicitly and individually? Beside the added 
flexibility and fine grain control, a var-arg enum would also make the 
API nicer/cleaner IMHO compare to a boolean.


For convenience, this enum could have an additional `ALL` option (and we 
would call out that if `ALL` is used, new error types might be added in 
future release making the code less safe/robust -- ie, use at your own 
risk only)


This way, we also explicitly document what exception might be thrown in 
the KIP, as we would add an enum for each error type explicitly, and 
also make if future proof for new error types we want to cover -- each 
addition would require a KIP to extend the enum.




-Matthias


On 7/18/24 10:33 PM, Artem Livshits wrote:

Hey folks,

Hopefully not to make this KIP go for another spin :-), but I thought of a
modification that might actually address safety concerns over using flags
to ignore a vaguely specified class of errors.

What if we had the following overload of .send method:

   void send(ProducerRecord record, Callback callback, boolean
throwImmediately)

and if throwImmediately=false, then we behave the same way as now (return
errors via Future and poison transaction) and if throwImmediately=true then
we just throw errors immediately from the send function.  This way we don't
ignore the error, we're just changing the method they are delivered.  Then
KStreams can catch the error for send(record, callback, true) and do
whatever it needs to do.

-Artem


On Mon, Jul 15, 2024 at 4:30 PM Greg Harris 
wrote:


Matthias,

Thank you for rejecting my suggested alternatives. Your responses are the
sorts of things I expected to see summarized in the text of the KIP.

I agree with most of your rejections, except this one:


"Estimation" is not sufficient, but we would need to know it exactly.
And that's an impl detail, given that the message format could change
and we could add new internal fields increasing the message size.


An estimate is certainly going to have an error. But an estimate shouldn't
be treated as exact anyway, there should be an error bound, or "safety
factor" used when interpreting it. For example, if the broker side limit is
1MB, and an estimate could be wrong by ~10%, then computing an estimate and
dropping records >900kb should be sufficient to prevent RTLEs.
This is the sort of estimation that I would expect application developers
could implement, without knowing the exact serialization and protocol
overhead. This could prevent user-originated oversize records from making
it to the producer.

Thanks,
Greg


On Mon, Jul 15, 2024 at 4:08 PM Matthias J. Sax  wrote:


I agree with Alieh and Artem -- in the end, why buffer records twice? We
effectively want to allow to push some error handling (which I btw
consider "business logic") into the producer. IMHO, there is nothing
wrong with it. Dropping a poison pill record is no really a violation of
atomicity from my POV, but a business logic decision to not include a
record in a transaction -- the proposed API just makes it much simpler
to achieve this business logic goal.



For memory size estimation, throughput or message size is actually not
relevant, right? We would need to look at producer buffer size, ie,
`batch.size`, `max.in.flight.request.per.connection` and guesstimate the
number of connections there might be? At least for KS, we don't need to
buffer everything until commit, but only until we get a successful "ack"
back.

Note that KS application not only need to write to (a single) user
result topic, but multiple output topics, as well as repartition and
changelog topics, across all tasks assigned to a thread (ie, producer),
which can easily be 10 tasks or more. If we assume topics with 30
partitions (topics with 50 or more partitions are not uncommon either),
and a producer who must write to 10 different topics, the number of
required connections is very quickly very high, and thus the required
"application buffer space" would be significant.



Your others ideas seems not to be viable alternatives:


S

Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol

2024-07-19 Thread Lianet M.
Hi Lucas/Bruno, thanks for the great KIP! First comments:

LM1. Related to where the KIP says:  *“Group ID, member ID, member epoch
are sent with each heartbeat request. Any other information that has not
changed since the last heartbeat can be omitted.”. *I expect all the other
info also needs to be sent whenever a full heartbeat is required (even if
it didn’t change from the last heartbeat), ex. on fencing scenarios,
correct?

LM2. For consumer groups we always send the groupInstanceId (if any) as
part of every heartbeat, along with memberId, epoch and groupId. Should we
consider that too here?

LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to
the stream-specific RPCs if the groupId is associated with a group type
that is not streams (ie. consumer group or share group). I wonder if at
this point, where we're getting several new group types added, each with
RPCs that are supposed to include groupId of a certain type, we should be
more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE
(group exists but not with a valid type for this RPC) vs a
GROUP_ID_NOT_FOUND (group does not exist).  Those errors would be
consistently used across consumer, share, and streams RPCs whenever the
group id is not of the expected type.
This is truly not specific to this KIP, and should be addressed with all
group types and their RPCs in mind. I just wanted to bring out my concern
and get thoughts around it.

LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if
groupId is empty. There is already an INVALID_GROUP_ID error, that seems
more specific to this situation. Error handling of specific errors would
definitely be easier than having to deal with a generic INVALID_REQUEST
(and probably its custom message). I know that for KIP-848 we have
INVALID_REQUEST for similar situations, so if ever we take down this path
we should review it there too for consistency. Thoughts?

LM5. The dependency between the StreamsGroupHeartbeat RPC and the
StreamsGroupInitialize RPC is one-way only right? HB requires a previous
StreamsGroupInitialize request, but StreamsGroupInitialize processing is
totally independent of heartbeats (and could perfectly be processed without
a previous HB, even though the client implementation we’re proposing won’t
go down that path). Is my understanding correct? Just to double check,
seems sensible like that at the protocol level.

LM6. With KIP-848, there is an important improvement that brings a
difference in behaviour around the static membership: with the classic
protocol, if a static member joins with a group instance already in use, it
makes the initial member fail with a FENCED_INSTANCED_ID exception, vs.
with the new consumer group protocol, the second member trying to join
fails with an UNRELEASED_INSTANCE_ID. Does this change need to be
considered in any way for the streams app? (I'm not familiar with KS yet,
but thought it was worth asking. If it doesn't affect in any way, still
maybe helpful to call it out on a section for static membership)

LM7. Regarding the admin tool to manage streams groups. We can discuss
whether to have it here or separately, but I think we should aim for some
basic admin capabilities from the start, mainly because I believe it will
be very helpful/needed in practice during the impl of the KIP. From
experience with KIP-848, we felt a bit blindfolded in the initial phase
where we still didn't have kafka-consumer-groups dealing with the new
groups (and then it was very helpful and used when we were able to easily
inspect them from the console)

LM8. nit: the links the KIP-848 are not quite right (pointing to an
unrelated “Future work section” at the end of KIP-848)

Thanks!

Lianet


On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
 wrote:

> Hi Andrew,
>
> AS2: I added a note for now. If others feel strongly about it, we can
> still add more administrative tools to the KIP - it should not change
> the overall story significantly.
>
> AS8: "streams.group.assignor.name" sounds good to me to distinguish
> the config from class names. Not sure if I like the "default". To be
> consistent, we'd then have to call it
> `group.streams.default.session.timeout.ms` as well. I only added the
> `.name` on both broker and group level for now.
>
> AS10: Ah, I misread your comment, now I know what you meant. Good
> point, fixed (by Bruno).
>
> Cheers,
> Lucas
>
> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
>  wrote:
> >
> > Hi Lucas,
> > I see that I hit send too quickly. One more comment:
> >
> > AS2: I think stating that there will be a `kafka-streams-group.sh` in a
> > future KIP is fine to keep this KIP focused. Personally, I would probably
> > put all of the gory details in this KIP, but then it’s not my KIP. A
> future
> > pointer is fine too.
> >
> > Thanks,
> > Andrew
> >
> >
> > > On 19 Jul 2024, at 13:46, Lucas Brutschy 
> wrote:
> > >
> > > Hi Andrew,
> > >
> > > thanks for getting the discussion going! Here are my responses

[jira] [Created] (KAFKA-17170) Add test to ensure new consumer acks reconciled assignment even if first HB with ack lost

2024-07-19 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-17170:
--

 Summary: Add test to ensure new consumer acks reconciled 
assignment even if first HB with ack lost
 Key: KAFKA-17170
 URL: https://issues.apache.org/jira/browse/KAFKA-17170
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


When a consumer reconciles an assignment, it transitions to ACKNOWLEDGING, so 
that a HB is sent on the next manager poll, without waiting for the interval. 
The consumer transitions out of this ack state as soon as it sends the 
heartbeat, without waiting for a response. This is based on the expectation 
that following heartbeats (sent on the interval) will act as ack, including the 
set of partitions even in case the first ack is lost. This is the expected flow:
 # complete reconciliation and send HB1 to ack assignment tp0 
 # send following HBs on interval (won't include tp0, given that it notices 
it's value did not change since last HB)
 # HB1 times out (or fails in any way) => heartbeat request manager resets the 
sentFields to null (HeartbeatState.reset() , triggered if the request fails, or 
if it gets a response with an Error)
 # following HB will include tp0 (and act as ack), because it will notice that 
tp0 != null (last value sent)

This seems not to be covered by any test, so we should add a unit test to the 
HeartbeatRequestManager, to ensure that the HB generated in step 4 above 
includes tp0 as I expect :)

This flow is important because if failing to send the reconciled partitions in 
a HB, the broker would remain waiting for an ack that the member would 
considered it already sent (the broker would wait for the rebalance timeout and 
then re-assigning those partitions)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1059: Enable the Producer flush() method to clear the latest send() error

2024-07-19 Thread Greg Harris
Hi Artem and Matthias,

> On the other hand, the effort to prove that
> keeping all records in memory won't break some scenarios (and generally
> breaking one is enough to cause a lot of pain) seems to be significantly
> higher than to prove that setting some flag in some API has pretty much 0
> chance of regression

> in the end, why buffer records twice?

> This way we don't
> ignore the error, we're just changing the method they are delivered.

> Very clean semantics
> which should also address the concern of "non-atomic tx"

I feel like my concerns are being minimized instead of being addressed
in this discussion, and if that's because I'm not expressing them clearly,
I apologize.

Many users come to Kafka with prior expectations, especially when we use
industry-standard terminology like 'Exactly Once Semantics",
"Transactions", "Commit", "Abort". Of course Kafka isn't an ACID-compliant
database, but users will evaluate, discuss, and develop applications with
Kafka through the lens of the ACID principles, because that is the
framework most commonly applied to transactional semantics.
The original design of KIP-98 [1] explicitly mentions atomic commits (with
the same meaning as the A in ACID) as the primary abstraction being added
(reproduced here):

> At the core, transactional guarantees enable applications to produce to
multiple TopicPartitions atomically, ie. all writes to these
TopicPartitions will succeed or fail as a unit.
> Further, since consumer progress is recorded as a write to the offsets
topic, the above capability is leveraged to enable applications to batch
consumed and produced messages into a single atomic unit, ie. a set of
messages may be considered consumed only if the entire
‘consume-transform-produce’ executed in its entirety.

I think it's important to say that to a user, "writes" really means "send()
and commitOffsets() calls", not literal produce requests to Kafka brokers,
and "consume-transform-produce" really means "poll(), transform, send()".
This is because to a user, the implementation within poll() and send() and
the broker are none of their concern, and are intended to be within the
abstraction.
When I say that this feature is a non-atomic commit, I mean that this
feature does not fit the above description, and breaks the transaction
abstraction in a meaningful way. No longer do all writes succeed or fail as
a unit, some failures are permitted to drop data. No longer must a
consume-transform-produce cycle be executed in its entirety, some parts may
be left incomplete.
This means that this method will be difficult to define ("which exceptions
are covered?"), difficult to document ("how do we explain
'not-really-atomic commits' clearly and unambiguously to a potential
user?"), and difficult to compose ("if someone turns this option on, how
does that affect delivery guarantees and opportunities for bugs in
upper layers?").
Users currently rely heavily on analogies to other database systems to make
sense of Kafka's transactions, and we need to use that to our benefit,
rather than designing in spite of it being true.

However this atomicity guarantee isn't always desirable, as evidenced by
the original bug report [2]. If you're interacting with a website form for
example, and a database transaction fails because one of your strings is
oversize, you don't need to re-input all of your form responses from
scratch, as there is an application layer/browser in-between to preserve
the state and retry the transaction.
And while you could make a convenience/performance/etc argument in that
situation ("The database should truncate/null-out the oversize string") and
modern databases often have very expressive DML that would permit such a
behavior (try-catch, etc), the End-to-End arguments [3] make me believe
that is a bad design and should be discouraged.
To that end, I was suggesting ways to push this farther and farther up the
stack, such as performing record size estimation. This doesn't mean that it
can't be added at a low level of abstraction, just that we need to make
sure to exhaust all other alternatives, and justify it with a performance
benefit.

I was holding off on discussing the literal design until you provided
concrete performance justification, but to progress the discussion while
i'm waiting for that, I can give my thoughts:

I don't think an overloaded send() method is appropriate given that this
appears to be a niche use-case, and the send() method itself is probably
the single most important method in the Clients library. The KIP-98 design
was a much more substantial change to the Producer than this KIP, and it
found a way to preserve the original type signature (but added an
exception).
Users picking up the Producer for the first time may see this additional
method, and may spend time trying to understand whether it is something
suitable for their use-case. In the best case, they ignore it and use the
other two signatures. But it is also possible that they will use