Re: [DISCUSS] KIP-1043: Administration of groups
Hi Apoorv, Thanks for your comments. AM1: I chose to leave the majority of the administration for the different types of groups in their own tools. The differences between the group types are significant and I think that one uber tool that subsumes kafka-consumer-groups.sh, kafka-share-groups.sh and kafka-streams-application-reset.sh would be too overwhelming and difficult to use. For example, the output from describing a consumer group is not the same as the output from describing a share group. AM2: I think you’re highlighting some of the effects of the evolution of groups. The classic consumer group protocol defined the idea of protocol as a way of distinguishing between the various ways people had extended the base protocol - “consumer", “connect", and “sr" are the main ones I’ve seen, and the special “” for groups that are not using member assignment. For the modern group protocol, each of the proposed implementations brings its own use of the protocol string - “consumer”, “share” and “streams”. Now, prior to AK 4.0, in order to make the console consumer use the new group protocol, you set `--consumer-property group.protocol=consumer`. This tells a factory method in the consumer to use the AsyncKafkaConsumer (group type is Consumer, protocol is “consumer") as opposed to the LegacyKafkaConsumer (group type is Classic, protocol is “consumer”). In AK 4.0, the default group protocol will change and setting the property will not be necessary. The name of the configuration “group.protocol” is slightly misleading. In practice, this is most likely to be used pre-AK 4.0 by people wanting to try out the new consumer. AM3: When you try to create a share group and that group ID is already in use by another type of group, the error message is “Group CG1 is not a share group”. It exists already, with the wrong type. AM4: This KIP changes the error behaviour for `kafka-consumer-groups.sh` and `kafka-share-groups.sh` such that any operation on a group that finds the group type is incorrect reports “Error: Group XXX is not a consumer group” or equivalent for the other group types. This change makes things much easier to understand than they are today. AM5: That section is just clarifying what the behaviour is. I don’t think it had been written down before. Thanks, Andrew > On 18 Jul 2024, at 16:43, Apoorv Mittal wrote: > > Hi Andrew, > Thanks for the KIP. The group administration is getting difficult with new > types of groups being added and certainly the proposal looks great. > I have some questions: > > AM1: The current proposal defines the behaviour for listing and describing > groups, simplifying create for `kafka-share-groups.sh`. Do we want to leave > the other group administration like delete to respective groups or shall > have common behaviour defined i.e. leave to respective > kafka-consumer-groups.sh or kafka-share-groups.sh? > > AM2: Reading the notes on KIP-848, > https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes, > which requires `--consumer-property group.protocol=consumer` to enable > modern consumer group. But the listing for `classic` "type" also defines > "protocol" as `consumer` in some scenarios. Is it intended or `classic` > type should different protocol? > > AM3: The KIP adds behaviour for `kafka-share-groups.sh` which defines the > `--create` option. Though it simplifies group creation, what should be the > error behaviour when the group with the same name exists but not of "share" > group type? > > AM4: The GroupMetadataManager.java stores all groups in the same data > structure which means the name has to be unique across different group > types. Do you think we should also change the error message for existing > kafka-consumer-groups.sh and kafka-share-groups.sh to recommend using new > kafka-groups.sh for extensive groups list? As currently the individual > scripts will result in "Group already exists" while creating new groups but > listing with respective utility will not yield the group. > > AM5: The KIP defines compatibility considerations for the ListGroups RPC. > But it's unclear to me why it's needed as the client and server supporting > `kafka-groups.sh` will be post ListGroups v5 API anyways hence TypesFilter > will be supported in both client and server. Am I missing something here? > > Regards, > Apoorv Mittal > +44 7721681581 > > > On Wed, Jul 17, 2024 at 6:26 PM Andrew Schofield > wrote: > >> Hi Lianet, >> Thanks for your comments. >> >> LM5. Unfortunately, the protocol type has to be a string rather than >> an enumeration. This is because when people have created their own >> extensions of the classic consumer group protocol, they have chosen >> their own protocol strings. For example, the Confluent schema registry >> uses “sr” and there are other examples in the wild. >> >> LM6.1. It’s because of the difference between a parameterised >> type and a raw type. >> >> I
[jira] [Resolved] (KAFKA-17121) junit-platform.properties files in published artifacts pollute the test classpath of consumers
[ https://issues.apache.org/jira/browse/KAFKA-17121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17121. Fix Version/s: 3.9.0 Resolution: Fixed > junit-platform.properties files in published artifacts pollute the test > classpath of consumers > -- > > Key: KAFKA-17121 > URL: https://issues.apache.org/jira/browse/KAFKA-17121 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 3.7.1 >Reporter: Andy Wilkinson >Assignee: kangning.li >Priority: Major > Fix For: 3.9.0 > > > https://github.com/apache/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d > added {{junit-platform.properties}} files to the test-classified > kafka-clients and kafka-server-common artifacts. When a consumer is using > these artifacts for its own Kafka-related tests, a warning is logged when the > tests are launched: > {code} > Jul 11, 2024 10:26:21 AM > org.junit.platform.launcher.core.LauncherConfigurationParameters > loadClasspathResource > WARNING: Discovered 2 'junit-platform.properties' configuration files in the > classpath; only the first will be used. > {code} > The fact that only the first will be used is potentially problematic. > Depending on the ordering of the classpath, it may result in a consumer's own > properties being ignored and Kafka's being used instead. > Can the {{junit-platform.properties}} files be removed from the published > artifacts? As far as I can tell, they were only intended to affect Kafka's > own tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17132) Revisit testMissingOffsetNoResetPolicy for AsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-17132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17132. Fix Version/s: 3.9.0 Resolution: Fixed > Revisit testMissingOffsetNoResetPolicy for AsyncConsumer > > > Key: KAFKA-17132 > URL: https://issues.apache.org/jira/browse/KAFKA-17132 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Major > Fix For: 3.9.0 > > > `testMissingOffsetNoResetPolicy` assumes `poll(0)` can complete the request > in time, but that is not in spec. Hence, we should increase the timeout to > enable it for AsyncConsumer -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Andrew, thanks for getting the discussion going! Here are my responses. AS1: Good point, done. AS2: We were planning to add more administrative tools to the interface in a follow-up KIP, to not make this KIP too large. If people think that it would help to understand the overall picture if we already add something like `kafka-streams-groups.sh`, we will do that. I also agree that we should address how this relates to KIP-1043, we'll add it. AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc. AS4: Thanks, Fixed. AS5: Good catch. This was supposed to mean that we require CREATE on cluster or CREATE on all topics, not both. Fixed. AS6: Thanks, Fixed. AS7. Thanks, Fixed. AS8: I think this works a bit different in this KIP than in consumer groups. KIP-848 lets the members vote for a preferred assignor, and the broker-side assignor is picked by majority vote. The `group.consumer.assignors` specifies the list of assignors that are supported on the broker, and is configurable because the interface is pluggable. In this KIP, the task assignor is not voted on by members but configured on the broker-side. `group.streams.assignor` is used for this, and uses a specific name. If we'll make the task assignor pluggable on the broker-side, we'd introduce a separate config `group.streams.assignors`, which would indeed be a list of class names. I think there is no conflict here, the two configurations serve different purposes. The only gripe I'd have here is naming as `group.streams.assignor` and `group.streams.assignors` would be a bit similar, but I cannot really think of a better name for `group.streams.assignor`, so I'd probably rather introduce `group.streams.assignors` as `group.streams.possible_assignors` or something like that. AS9: I added explanations for the various record types. Apart from the new topology record, and the partition metadata (which is based on the topology and can only be created once we have a topology initialized) the lifecycle for the records is basically identical as in KIP-848. AS10: In the consumer offset topic, the version in the key is used to differentiate different schema types with the same content. So the keys are not versioned, but the version field is "abused" as a type tag. This is the same in KIP-848, we followed it for consistency. Cheers, Lucas On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield wrote: > > Hi Lucas and Bruno, > > Thanks for the great KIP. > > I've read through the document and have some initial comments. > > AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration > constant. This is a change to the public interface and should be called out. > > AS2: Since streams groups are no longer consumer groups, how does the user > manipulate them, observe lag and so on? Will you add `kafka-streams-groups.sh` > or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can easily > be extended to support streams groups, but that only lets the user see the > groups, not manipulate them. > > AS3: I wonder whether the streams group state of UNINITIALIZED would be > better expressed as INITIALIZING. > > AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should > be nullable. > > AS5: Why does StreamsGroupInitialize require CREATE permission on the > cluster resource? I imagine that this is one of the ways that the request > might > be granted permission to create the StateChangelogTopics and > RepartitionSourceTopics, but if it is granted permission to create those > topics > with specific ACLs, would CREATE on the cluster resource still be required? > > AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED > and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. > > AS7: A tiny nit. You've used TopologyID (capitals) in > StreamsGroupHeartbeatRequest > and a few others, but in all other cases the fields which are ids are spelled > Id. > I suggest TopologyId. > > Also, "interal" is probably meant to be "interval”. > > AS8: For consumer groups, the `group.consumer.assignors` configuration is a > list of class names. The assignors do have names too, but the configuration > which > enables them is in terms of class names. I wonder whether the broker’s > group.streams.assignor could actually be `group.streams.assignors` and > specified > as a list of the class names of the supplied assignors. I know you're not > supporting > other assignors yet, but when you do, I expect you would prefer to have used > class > names from the start. > > The use of assignor names in the other places looks good to me. > > AS9: I'd find it really helpful to have a bit of a description about the > purpose and > lifecycle of the 9 record types you've introduced on the __consumer_offsets > topic. > I did a cursory review but without really understanding what's written when, > I can't do a thorough job of reviewing. > > AS10: In the definitions of the record keys, such as > StreamsGroupCurren
[jira] [Resolved] (KAFKA-12877) Fix KRPC files with missing flexibleVersions annotation
[ https://issues.apache.org/jira/browse/KAFKA-12877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12877. Fix Version/s: 3.0.0 Resolution: Fixed > Fix KRPC files with missing flexibleVersions annotation > --- > > Key: KAFKA-12877 > URL: https://issues.apache.org/jira/browse/KAFKA-12877 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.0.0 > > > Some KRPC files do not specify their flexibleVersions. Unfortunately, in this > case, we default to not supporting any flexible versions. This is a poor > default, since the flexible format is both more efficient (usually) and > flexible. > Make flexibleVersions explicit and disallow setting anything except "0+" on > new RPC and metadata records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1068: KIP-1068: New JMX Metrics for AsyncKafkaConsumer
Hi Brendan, Thanks for the KIP. The metrics are always helpful. AM1: Is `application-event-queue-age-avg` enough or do we require ` application-event-queue-age-max` as well to differentiate with outliers? AM2: The kafka producer defines metric `record-queue-time-avg` which captures the time spent in the buffer. Do you think we should have a similar name for `application-event-queue-age-avg` i.e. change to ` application-event-queue-time-avg`? Moreover other than similar naming, `time` anyways seems more suitable than `age`, though minor. The `time` usage is also aligned with the description of this metric. AM3: Metric `application-event-processing-time` says "the average time, that the consumer network.". Shall we have the `-avg` suffix in the metric as we have defined for other metrics? Also do we require the max metric as well for the same? AM4: Is the telemetry name for `unsent-requests-queue-size` intended as `org.apache.kafka.consumer.unsent.requests.size`, or it should be corrected to ` org.apache.kafka.consumer.unsent.requests.queue.size`? AM2: Regards, Apoorv Mittal +44 7721681581 On Mon, Jul 15, 2024 at 2:45 PM Andrew Schofield wrote: > Hi Brenden, > Thanks for the updates. > > AS4. I see that you’ve added `.ms` to a bunch of the metrics reflecting the > fact that they’re measured in milliseconds. However, I observe that most > metrics > in Kafka that are measured in milliseconds, with some exceptions in Kafka > Connect > and MirrorMaker do not follow this convention. I would tend to err on the > side of > consistency with the existing metrics and not use `.ms`. However, that’s > just my > opinion, so I’d be interested to know what other reviewers of the KIP > think. > > Thanks, > Andrew > > > On 12 Jul 2024, at 20:11, Brenden Deluna > wrote: > > > > Hey Lianet, > > > > Thank you for your suggestions and feedback! > > > > > > LM1. This has now been addressed. > > > > > > LM2. I think that would be a valuable addition to the current set of > > metrics, I will get that added. > > > > > > LM3. Again great idea, that would certainly be helpful. Will add that as > > well. > > > > > > Let me know if you have any more suggestions! > > > > > > Thanks, > > > > Brenden > > > > On Fri, Jul 12, 2024 at 2:11 PM Brenden Deluna > wrote: > > > >> Hi Lucas, > >> > >> Thank you for the feedback! I have addressed your comments: > >> > >> > >> LB1. Good catch there, I will update the names as needed. > >> > >> > >> LB2. Good catch again! I will update the name to be more consistent. > >> > >> > >> LB3. Thank you for pointing this out, I realized that all metric values > >> will actually be set to 0. I will specifiy this and explain why they > will > >> be 0. > >> > >> > >> Nit: This metric is referring to the queue of unsent requests in the > >> NetworkClientDelegate. For the metric descriptions I am trying to not > >> include too much of the implementation details, hence the reason that > >> description is quite short. I cannot think of other ways to describe the > >> metric without going deeper into the implementation, but please do let > me > >> know if you have any ideas. > >> > >> > >> Thank you, > >> > >> Brenden > >> > >> On Fri, Jul 12, 2024 at 1:27 PM Lianet M. wrote: > >> > >>> Hey Brenden, thanks for the KIP! Great to get more visibility into the > new > >>> consumer. > >>> > >>> LM1. +1 on Lucas's suggestion for including the unit in the name, seems > >>> clearer and consistent (I do see several time metrics including ms) > >>> > >>> LM2. What about a new metric for application-event-queue-time-ms. It > would > >>> be a complement to the application-event-queue-size you're proposing, > and > >>> it will tell us how long the events sit in the queue, waiting to be > >>> processed (from the time the API call adds the event to the queue, to > the > >>> time it's processed in the background thread). I find it would be very > >>> interesting. > >>> > >>> LM3. Thinking about the actual usage of > >>> "time-between-network-thread-poll-xxx" metric, I imagine it would be > >>> helpful to know more about what could be impacting it. As I see it, the > >>> network thread cadence could be mainly impacted by: 1- app event > >>> processing > >>> (generate requests), 2- network client poll (actual send/receive). For > 2, > >>> the new consumer reuses the same component as the legacy one, but 1 is > >>> specific to the new consumer, so what about a metric > >>> for application-event-processing-time-ms (we could consider avg I would > >>> say). It would be the time that the network thread takes to process all > >>> available events on each run. > >>> > >>> Cheers! > >>> Lianet > >>> > >>> On Fri, Jul 12, 2024 at 1:57 PM Lucas Brutschy > >>> wrote: > >>> > Hey Brenden, > > thanks for the KIP! These will be great to observe and debug the > background thread of the new consumer. > > LB1. `time-between-network-thread-poll-max` → I see several similar > metrics including the uni
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Lucas, Thanks for your response. All makes sense for me, with just a couple of follow-up comments. AS8: So, really the broker config is the name of the default assignor used unless it’s overridden by a group config. I have one suggestion, which you can of course ignore, that you use `group.streams.default.assignor.name` on the broker, `group.streams.assignor.name` on the group, `group.remote.assignor.name` on the application, and when you permit pluggable assignors, you use `group.streams.assignors` for the list of class names. I’d prefer if we’d used `group.*.assignor.class.names` for all group types but that ship has sailed. AS10: Yes, exactly. In practice, you’ll assign the numbers as you implement the KIP. Looks good to me. I’ll take another look now the records are more fully described. Thanks, Andrew > On 19 Jul 2024, at 13:46, Lucas Brutschy > wrote: > > Hi Andrew, > > thanks for getting the discussion going! Here are my responses. > > AS1: Good point, done. > > AS2: We were planning to add more administrative tools to the > interface in a follow-up KIP, to not make this KIP too large. If > people think that it would help to understand the overall picture if > we already add something like `kafka-streams-groups.sh`, we will do > that. I also agree that we should address how this relates to > KIP-1043, we'll add it. > > AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc. > > AS4: Thanks, Fixed. > > AS5: Good catch. This was supposed to mean that we require CREATE on > cluster or CREATE on all topics, not both. Fixed. > > AS6: Thanks, Fixed. > > AS7. Thanks, Fixed. > > AS8: I think this works a bit different in this KIP than in consumer > groups. KIP-848 lets the members vote for a preferred assignor, and > the broker-side assignor is picked by majority vote. The > `group.consumer.assignors` specifies the list of assignors that are > supported on the broker, and is configurable because the interface is > pluggable. In this KIP, the task assignor is not voted on by members > but configured on the broker-side. `group.streams.assignor` is used > for this, and uses a specific name. If we'll make the task assignor > pluggable on the broker-side, we'd introduce a separate config > `group.streams.assignors`, which would indeed be a list of class > names. I think there is no conflict here, the two configurations serve > different purposes. The only gripe I'd have here is naming as > `group.streams.assignor` and `group.streams.assignors` would be a bit > similar, but I cannot really think of a better name for > `group.streams.assignor`, so I'd probably rather introduce > `group.streams.assignors` as `group.streams.possible_assignors` or > something like that. > > AS9: I added explanations for the various record types. Apart from the > new topology record, and the partition metadata (which is based on the > topology and can only be created once we have a topology initialized) > the lifecycle for the records is basically identical as in KIP-848. > > AS10: In the consumer offset topic, the version in the key is used to > differentiate different schema types with the same content. So the > keys are not versioned, but the version field is "abused" as a type > tag. This is the same in KIP-848, we followed it for consistency. > > Cheers, > Lucas > > > On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > wrote: >> >> Hi Lucas and Bruno, >> >> Thanks for the great KIP. >> >> I've read through the document and have some initial comments. >> >> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration >> constant. This is a change to the public interface and should be called out. >> >> AS2: Since streams groups are no longer consumer groups, how does the user >> manipulate them, observe lag and so on? Will you add >> `kafka-streams-groups.sh` >> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can >> easily >> be extended to support streams groups, but that only lets the user see the >> groups, not manipulate them. >> >> AS3: I wonder whether the streams group state of UNINITIALIZED would be >> better expressed as INITIALIZING. >> >> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should >> be nullable. >> >> AS5: Why does StreamsGroupInitialize require CREATE permission on the >> cluster resource? I imagine that this is one of the ways that the request >> might >> be granted permission to create the StateChangelogTopics and >> RepartitionSourceTopics, but if it is granted permission to create those >> topics >> with specific ACLs, would CREATE on the cluster resource still be required? >> >> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED >> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. >> >> AS7: A tiny nit. You've used TopologyID (capitals) in >> StreamsGroupHeartbeatRequest >> and a few others, but in all other cases the fields which are ids are >> spelled Id. >> I suggest TopologyId
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Lucas, I see that I hit send too quickly. One more comment: AS2: I think stating that there will be a `kafka-streams-group.sh` in a future KIP is fine to keep this KIP focused. Personally, I would probably put all of the gory details in this KIP, but then it’s not my KIP. A future pointer is fine too. Thanks, Andrew > On 19 Jul 2024, at 13:46, Lucas Brutschy > wrote: > > Hi Andrew, > > thanks for getting the discussion going! Here are my responses. > > AS1: Good point, done. > > AS2: We were planning to add more administrative tools to the > interface in a follow-up KIP, to not make this KIP too large. If > people think that it would help to understand the overall picture if > we already add something like `kafka-streams-groups.sh`, we will do > that. I also agree that we should address how this relates to > KIP-1043, we'll add it. > > AS3: Good idea, that's more consistent with `assigning` and `reconciling` etc. > > AS4: Thanks, Fixed. > > AS5: Good catch. This was supposed to mean that we require CREATE on > cluster or CREATE on all topics, not both. Fixed. > > AS6: Thanks, Fixed. > > AS7. Thanks, Fixed. > > AS8: I think this works a bit different in this KIP than in consumer > groups. KIP-848 lets the members vote for a preferred assignor, and > the broker-side assignor is picked by majority vote. The > `group.consumer.assignors` specifies the list of assignors that are > supported on the broker, and is configurable because the interface is > pluggable. In this KIP, the task assignor is not voted on by members > but configured on the broker-side. `group.streams.assignor` is used > for this, and uses a specific name. If we'll make the task assignor > pluggable on the broker-side, we'd introduce a separate config > `group.streams.assignors`, which would indeed be a list of class > names. I think there is no conflict here, the two configurations serve > different purposes. The only gripe I'd have here is naming as > `group.streams.assignor` and `group.streams.assignors` would be a bit > similar, but I cannot really think of a better name for > `group.streams.assignor`, so I'd probably rather introduce > `group.streams.assignors` as `group.streams.possible_assignors` or > something like that. > > AS9: I added explanations for the various record types. Apart from the > new topology record, and the partition metadata (which is based on the > topology and can only be created once we have a topology initialized) > the lifecycle for the records is basically identical as in KIP-848. > > AS10: In the consumer offset topic, the version in the key is used to > differentiate different schema types with the same content. So the > keys are not versioned, but the version field is "abused" as a type > tag. This is the same in KIP-848, we followed it for consistency. > > Cheers, > Lucas > > > On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > wrote: >> >> Hi Lucas and Bruno, >> >> Thanks for the great KIP. >> >> I've read through the document and have some initial comments. >> >> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS enumeration >> constant. This is a change to the public interface and should be called out. >> >> AS2: Since streams groups are no longer consumer groups, how does the user >> manipulate them, observe lag and so on? Will you add >> `kafka-streams-groups.sh` >> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can >> easily >> be extended to support streams groups, but that only lets the user see the >> groups, not manipulate them. >> >> AS3: I wonder whether the streams group state of UNINITIALIZED would be >> better expressed as INITIALIZING. >> >> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should >> be nullable. >> >> AS5: Why does StreamsGroupInitialize require CREATE permission on the >> cluster resource? I imagine that this is one of the ways that the request >> might >> be granted permission to create the StateChangelogTopics and >> RepartitionSourceTopics, but if it is granted permission to create those >> topics >> with specific ACLs, would CREATE on the cluster resource still be required? >> >> AS6: StreamsGroupInitialize can also fail with TOPIC_AUTHORIZATION_FAILED >> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. >> >> AS7: A tiny nit. You've used TopologyID (capitals) in >> StreamsGroupHeartbeatRequest >> and a few others, but in all other cases the fields which are ids are >> spelled Id. >> I suggest TopologyId. >> >> Also, "interal" is probably meant to be "interval”. >> >> AS8: For consumer groups, the `group.consumer.assignors` configuration is a >> list of class names. The assignors do have names too, but the configuration >> which >> enables them is in terms of class names. I wonder whether the broker’s >> group.streams.assignor could actually be `group.streams.assignors` and >> specified >> as a list of the class names of the supplied assignors. I know you're not >> supporting >> other assignors
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Andrew, AS2: I added a note for now. If others feel strongly about it, we can still add more administrative tools to the KIP - it should not change the overall story significantly. AS8: "streams.group.assignor.name" sounds good to me to distinguish the config from class names. Not sure if I like the "default". To be consistent, we'd then have to call it `group.streams.default.session.timeout.ms` as well. I only added the `.name` on both broker and group level for now. AS10: Ah, I misread your comment, now I know what you meant. Good point, fixed (by Bruno). Cheers, Lucas On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield wrote: > > Hi Lucas, > I see that I hit send too quickly. One more comment: > > AS2: I think stating that there will be a `kafka-streams-group.sh` in a > future KIP is fine to keep this KIP focused. Personally, I would probably > put all of the gory details in this KIP, but then it’s not my KIP. A future > pointer is fine too. > > Thanks, > Andrew > > > > On 19 Jul 2024, at 13:46, Lucas Brutschy > > wrote: > > > > Hi Andrew, > > > > thanks for getting the discussion going! Here are my responses. > > > > AS1: Good point, done. > > > > AS2: We were planning to add more administrative tools to the > > interface in a follow-up KIP, to not make this KIP too large. If > > people think that it would help to understand the overall picture if > > we already add something like `kafka-streams-groups.sh`, we will do > > that. I also agree that we should address how this relates to > > KIP-1043, we'll add it. > > > > AS3: Good idea, that's more consistent with `assigning` and `reconciling` > > etc. > > > > AS4: Thanks, Fixed. > > > > AS5: Good catch. This was supposed to mean that we require CREATE on > > cluster or CREATE on all topics, not both. Fixed. > > > > AS6: Thanks, Fixed. > > > > AS7. Thanks, Fixed. > > > > AS8: I think this works a bit different in this KIP than in consumer > > groups. KIP-848 lets the members vote for a preferred assignor, and > > the broker-side assignor is picked by majority vote. The > > `group.consumer.assignors` specifies the list of assignors that are > > supported on the broker, and is configurable because the interface is > > pluggable. In this KIP, the task assignor is not voted on by members > > but configured on the broker-side. `group.streams.assignor` is used > > for this, and uses a specific name. If we'll make the task assignor > > pluggable on the broker-side, we'd introduce a separate config > > `group.streams.assignors`, which would indeed be a list of class > > names. I think there is no conflict here, the two configurations serve > > different purposes. The only gripe I'd have here is naming as > > `group.streams.assignor` and `group.streams.assignors` would be a bit > > similar, but I cannot really think of a better name for > > `group.streams.assignor`, so I'd probably rather introduce > > `group.streams.assignors` as `group.streams.possible_assignors` or > > something like that. > > > > AS9: I added explanations for the various record types. Apart from the > > new topology record, and the partition metadata (which is based on the > > topology and can only be created once we have a topology initialized) > > the lifecycle for the records is basically identical as in KIP-848. > > > > AS10: In the consumer offset topic, the version in the key is used to > > differentiate different schema types with the same content. So the > > keys are not versioned, but the version field is "abused" as a type > > tag. This is the same in KIP-848, we followed it for consistency. > > > > Cheers, > > Lucas > > > > > > On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > > wrote: > >> > >> Hi Lucas and Bruno, > >> > >> Thanks for the great KIP. > >> > >> I've read through the document and have some initial comments. > >> > >> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS > >> enumeration > >> constant. This is a change to the public interface and should be called > >> out. > >> > >> AS2: Since streams groups are no longer consumer groups, how does the user > >> manipulate them, observe lag and so on? Will you add > >> `kafka-streams-groups.sh` > >> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 can > >> easily > >> be extended to support streams groups, but that only lets the user see the > >> groups, not manipulate them. > >> > >> AS3: I wonder whether the streams group state of UNINITIALIZED would be > >> better expressed as INITIALIZING. > >> > >> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex should > >> be nullable. > >> > >> AS5: Why does StreamsGroupInitialize require CREATE permission on the > >> cluster resource? I imagine that this is one of the ways that the request > >> might > >> be granted permission to create the StateChangelogTopics and > >> RepartitionSourceTopics, but if it is granted permission to create those > >> topics > >> with specific ACLs, would CREATE on the cluster reso
[jira] [Created] (KAFKA-17169) Add an EndpointsTest suite that test all of the public methods
José Armando García Sancio created KAFKA-17169: -- Summary: Add an EndpointsTest suite that test all of the public methods Key: KAFKA-17169 URL: https://issues.apache.org/jira/browse/KAFKA-17169 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1068: KIP-1068: New JMX Metrics for AsyncKafkaConsumer
Hi Apoorv, Thank you for your comments, I will address each. AM1. I can see the usefulness in also having an 'application-event-queue-age-max' to get an idea of outliers and how they may be affecting the average metric. I will add that. AM2. I agree with you there, I think 'time' is a better descriptor here than 'age'. I will update those metric names as well. AM3. Similar to above comments, I will change the name of that metric to be more consistent. And I think a max metric would also be useful here, adding that. AM4. Yes, good catch there. Will update that as well. Thank you, Brenden On Fri, Jul 19, 2024 at 8:14 AM Apoorv Mittal wrote: > Hi Brendan, > Thanks for the KIP. The metrics are always helpful. > > AM1: Is `application-event-queue-age-avg` enough or do we require ` > application-event-queue-age-max` as well to differentiate with outliers? > > AM2: The kafka producer defines metric `record-queue-time-avg` which > captures the time spent in the buffer. Do you think we should have a > similar name for `application-event-queue-age-avg` i.e. change to ` > application-event-queue-time-avg`? Moreover other than similar naming, > `time` anyways seems more suitable than `age`, though minor. The `time` > usage is also aligned with the description of this metric. > > AM3: Metric `application-event-processing-time` says "the average time, > that the consumer network.". Shall we have the `-avg` suffix in the > metric as we have defined for other metrics? Also do we require the max > metric as well for the same? > > AM4: Is the telemetry name for `unsent-requests-queue-size` intended > as `org.apache.kafka.consumer.unsent.requests.size`, > or it should be corrected to ` > org.apache.kafka.consumer.unsent.requests.queue.size`? > > AM2: > Regards, > Apoorv Mittal > +44 7721681581 > > > On Mon, Jul 15, 2024 at 2:45 PM Andrew Schofield < > andrew_schofi...@live.com> > wrote: > > > Hi Brenden, > > Thanks for the updates. > > > > AS4. I see that you’ve added `.ms` to a bunch of the metrics reflecting > the > > fact that they’re measured in milliseconds. However, I observe that most > > metrics > > in Kafka that are measured in milliseconds, with some exceptions in Kafka > > Connect > > and MirrorMaker do not follow this convention. I would tend to err on the > > side of > > consistency with the existing metrics and not use `.ms`. However, that’s > > just my > > opinion, so I’d be interested to know what other reviewers of the KIP > > think. > > > > Thanks, > > Andrew > > > > > On 12 Jul 2024, at 20:11, Brenden Deluna > > > wrote: > > > > > > Hey Lianet, > > > > > > Thank you for your suggestions and feedback! > > > > > > > > > LM1. This has now been addressed. > > > > > > > > > LM2. I think that would be a valuable addition to the current set of > > > metrics, I will get that added. > > > > > > > > > LM3. Again great idea, that would certainly be helpful. Will add that > as > > > well. > > > > > > > > > Let me know if you have any more suggestions! > > > > > > > > > Thanks, > > > > > > Brenden > > > > > > On Fri, Jul 12, 2024 at 2:11 PM Brenden Deluna > > wrote: > > > > > >> Hi Lucas, > > >> > > >> Thank you for the feedback! I have addressed your comments: > > >> > > >> > > >> LB1. Good catch there, I will update the names as needed. > > >> > > >> > > >> LB2. Good catch again! I will update the name to be more consistent. > > >> > > >> > > >> LB3. Thank you for pointing this out, I realized that all metric > values > > >> will actually be set to 0. I will specifiy this and explain why they > > will > > >> be 0. > > >> > > >> > > >> Nit: This metric is referring to the queue of unsent requests in the > > >> NetworkClientDelegate. For the metric descriptions I am trying to not > > >> include too much of the implementation details, hence the reason that > > >> description is quite short. I cannot think of other ways to describe > the > > >> metric without going deeper into the implementation, but please do let > > me > > >> know if you have any ideas. > > >> > > >> > > >> Thank you, > > >> > > >> Brenden > > >> > > >> On Fri, Jul 12, 2024 at 1:27 PM Lianet M. wrote: > > >> > > >>> Hey Brenden, thanks for the KIP! Great to get more visibility into > the > > new > > >>> consumer. > > >>> > > >>> LM1. +1 on Lucas's suggestion for including the unit in the name, > seems > > >>> clearer and consistent (I do see several time metrics including ms) > > >>> > > >>> LM2. What about a new metric for application-event-queue-time-ms. It > > would > > >>> be a complement to the application-event-queue-size you're proposing, > > and > > >>> it will tell us how long the events sit in the queue, waiting to be > > >>> processed (from the time the API call adds the event to the queue, to > > the > > >>> time it's processed in the background thread). I find it would be > very > > >>> interesting. > > >>> > > >>> LM3. Thinking about the actual usage of > > >>> "time-between-network-thread-poll-xxx" metric, I imagine it wou
Re: [DISCUSS] KIP-1059: Enable the Producer flush() method to clear the latest send() error
For catching client side errors this would work IMHO. I am ok with this. We throw before we add the record to the batch. Very clean semantics which should also address the concern of "non-atomic tx"... The exception clearly indicates that the record was not added to the TX, and users can react to it accordingly. We did discuss this idea previously, but did not have a good proposal to make it backward compatible. The newly proposed overload would address this issue of backward compatibility. Of course, it might not make it easily extensible in the future for broker side errors, but it's unclear anyway right now, if we would even get to a solution for broker side errors or not -- so maybe it's ok to accept this and drop/ignore the broker side error question for now. A small follow up thought/question: instead of using a boolean, would we actually want to make it a var-arg enum to allow users to enable this for certain errors explicitly and individually? Beside the added flexibility and fine grain control, a var-arg enum would also make the API nicer/cleaner IMHO compare to a boolean. For convenience, this enum could have an additional `ALL` option (and we would call out that if `ALL` is used, new error types might be added in future release making the code less safe/robust -- ie, use at your own risk only) This way, we also explicitly document what exception might be thrown in the KIP, as we would add an enum for each error type explicitly, and also make if future proof for new error types we want to cover -- each addition would require a KIP to extend the enum. -Matthias On 7/18/24 10:33 PM, Artem Livshits wrote: Hey folks, Hopefully not to make this KIP go for another spin :-), but I thought of a modification that might actually address safety concerns over using flags to ignore a vaguely specified class of errors. What if we had the following overload of .send method: void send(ProducerRecord record, Callback callback, boolean throwImmediately) and if throwImmediately=false, then we behave the same way as now (return errors via Future and poison transaction) and if throwImmediately=true then we just throw errors immediately from the send function. This way we don't ignore the error, we're just changing the method they are delivered. Then KStreams can catch the error for send(record, callback, true) and do whatever it needs to do. -Artem On Mon, Jul 15, 2024 at 4:30 PM Greg Harris wrote: Matthias, Thank you for rejecting my suggested alternatives. Your responses are the sorts of things I expected to see summarized in the text of the KIP. I agree with most of your rejections, except this one: "Estimation" is not sufficient, but we would need to know it exactly. And that's an impl detail, given that the message format could change and we could add new internal fields increasing the message size. An estimate is certainly going to have an error. But an estimate shouldn't be treated as exact anyway, there should be an error bound, or "safety factor" used when interpreting it. For example, if the broker side limit is 1MB, and an estimate could be wrong by ~10%, then computing an estimate and dropping records >900kb should be sufficient to prevent RTLEs. This is the sort of estimation that I would expect application developers could implement, without knowing the exact serialization and protocol overhead. This could prevent user-originated oversize records from making it to the producer. Thanks, Greg On Mon, Jul 15, 2024 at 4:08 PM Matthias J. Sax wrote: I agree with Alieh and Artem -- in the end, why buffer records twice? We effectively want to allow to push some error handling (which I btw consider "business logic") into the producer. IMHO, there is nothing wrong with it. Dropping a poison pill record is no really a violation of atomicity from my POV, but a business logic decision to not include a record in a transaction -- the proposed API just makes it much simpler to achieve this business logic goal. For memory size estimation, throughput or message size is actually not relevant, right? We would need to look at producer buffer size, ie, `batch.size`, `max.in.flight.request.per.connection` and guesstimate the number of connections there might be? At least for KS, we don't need to buffer everything until commit, but only until we get a successful "ack" back. Note that KS application not only need to write to (a single) user result topic, but multiple output topics, as well as repartition and changelog topics, across all tasks assigned to a thread (ie, producer), which can easily be 10 tasks or more. If we assume topics with 30 partitions (topics with 50 or more partitions are not uncommon either), and a producer who must write to 10 different topics, the number of required connections is very quickly very high, and thus the required "application buffer space" would be significant. Your others ideas seems not to be viable alternatives: S
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Lucas/Bruno, thanks for the great KIP! First comments: LM1. Related to where the KIP says: *“Group ID, member ID, member epoch are sent with each heartbeat request. Any other information that has not changed since the last heartbeat can be omitted.”. *I expect all the other info also needs to be sent whenever a full heartbeat is required (even if it didn’t change from the last heartbeat), ex. on fencing scenarios, correct? LM2. For consumer groups we always send the groupInstanceId (if any) as part of every heartbeat, along with memberId, epoch and groupId. Should we consider that too here? LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to the stream-specific RPCs if the groupId is associated with a group type that is not streams (ie. consumer group or share group). I wonder if at this point, where we're getting several new group types added, each with RPCs that are supposed to include groupId of a certain type, we should be more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE (group exists but not with a valid type for this RPC) vs a GROUP_ID_NOT_FOUND (group does not exist). Those errors would be consistently used across consumer, share, and streams RPCs whenever the group id is not of the expected type. This is truly not specific to this KIP, and should be addressed with all group types and their RPCs in mind. I just wanted to bring out my concern and get thoughts around it. LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if groupId is empty. There is already an INVALID_GROUP_ID error, that seems more specific to this situation. Error handling of specific errors would definitely be easier than having to deal with a generic INVALID_REQUEST (and probably its custom message). I know that for KIP-848 we have INVALID_REQUEST for similar situations, so if ever we take down this path we should review it there too for consistency. Thoughts? LM5. The dependency between the StreamsGroupHeartbeat RPC and the StreamsGroupInitialize RPC is one-way only right? HB requires a previous StreamsGroupInitialize request, but StreamsGroupInitialize processing is totally independent of heartbeats (and could perfectly be processed without a previous HB, even though the client implementation we’re proposing won’t go down that path). Is my understanding correct? Just to double check, seems sensible like that at the protocol level. LM6. With KIP-848, there is an important improvement that brings a difference in behaviour around the static membership: with the classic protocol, if a static member joins with a group instance already in use, it makes the initial member fail with a FENCED_INSTANCED_ID exception, vs. with the new consumer group protocol, the second member trying to join fails with an UNRELEASED_INSTANCE_ID. Does this change need to be considered in any way for the streams app? (I'm not familiar with KS yet, but thought it was worth asking. If it doesn't affect in any way, still maybe helpful to call it out on a section for static membership) LM7. Regarding the admin tool to manage streams groups. We can discuss whether to have it here or separately, but I think we should aim for some basic admin capabilities from the start, mainly because I believe it will be very helpful/needed in practice during the impl of the KIP. From experience with KIP-848, we felt a bit blindfolded in the initial phase where we still didn't have kafka-consumer-groups dealing with the new groups (and then it was very helpful and used when we were able to easily inspect them from the console) LM8. nit: the links the KIP-848 are not quite right (pointing to an unrelated “Future work section” at the end of KIP-848) Thanks! Lianet On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy wrote: > Hi Andrew, > > AS2: I added a note for now. If others feel strongly about it, we can > still add more administrative tools to the KIP - it should not change > the overall story significantly. > > AS8: "streams.group.assignor.name" sounds good to me to distinguish > the config from class names. Not sure if I like the "default". To be > consistent, we'd then have to call it > `group.streams.default.session.timeout.ms` as well. I only added the > `.name` on both broker and group level for now. > > AS10: Ah, I misread your comment, now I know what you meant. Good > point, fixed (by Bruno). > > Cheers, > Lucas > > On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield > wrote: > > > > Hi Lucas, > > I see that I hit send too quickly. One more comment: > > > > AS2: I think stating that there will be a `kafka-streams-group.sh` in a > > future KIP is fine to keep this KIP focused. Personally, I would probably > > put all of the gory details in this KIP, but then it’s not my KIP. A > future > > pointer is fine too. > > > > Thanks, > > Andrew > > > > > > > On 19 Jul 2024, at 13:46, Lucas Brutschy > wrote: > > > > > > Hi Andrew, > > > > > > thanks for getting the discussion going! Here are my responses
[jira] [Created] (KAFKA-17170) Add test to ensure new consumer acks reconciled assignment even if first HB with ack lost
Lianet Magrans created KAFKA-17170: -- Summary: Add test to ensure new consumer acks reconciled assignment even if first HB with ack lost Key: KAFKA-17170 URL: https://issues.apache.org/jira/browse/KAFKA-17170 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When a consumer reconciles an assignment, it transitions to ACKNOWLEDGING, so that a HB is sent on the next manager poll, without waiting for the interval. The consumer transitions out of this ack state as soon as it sends the heartbeat, without waiting for a response. This is based on the expectation that following heartbeats (sent on the interval) will act as ack, including the set of partitions even in case the first ack is lost. This is the expected flow: # complete reconciliation and send HB1 to ack assignment tp0 # send following HBs on interval (won't include tp0, given that it notices it's value did not change since last HB) # HB1 times out (or fails in any way) => heartbeat request manager resets the sentFields to null (HeartbeatState.reset() , triggered if the request fails, or if it gets a response with an Error) # following HB will include tp0 (and act as ack), because it will notice that tp0 != null (last value sent) This seems not to be covered by any test, so we should add a unit test to the HeartbeatRequestManager, to ensure that the HB generated in step 4 above includes tp0 as I expect :) This flow is important because if failing to send the reconciled partitions in a HB, the broker would remain waiting for an ack that the member would considered it already sent (the broker would wait for the rebalance timeout and then re-assigning those partitions) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1059: Enable the Producer flush() method to clear the latest send() error
Hi Artem and Matthias, > On the other hand, the effort to prove that > keeping all records in memory won't break some scenarios (and generally > breaking one is enough to cause a lot of pain) seems to be significantly > higher than to prove that setting some flag in some API has pretty much 0 > chance of regression > in the end, why buffer records twice? > This way we don't > ignore the error, we're just changing the method they are delivered. > Very clean semantics > which should also address the concern of "non-atomic tx" I feel like my concerns are being minimized instead of being addressed in this discussion, and if that's because I'm not expressing them clearly, I apologize. Many users come to Kafka with prior expectations, especially when we use industry-standard terminology like 'Exactly Once Semantics", "Transactions", "Commit", "Abort". Of course Kafka isn't an ACID-compliant database, but users will evaluate, discuss, and develop applications with Kafka through the lens of the ACID principles, because that is the framework most commonly applied to transactional semantics. The original design of KIP-98 [1] explicitly mentions atomic commits (with the same meaning as the A in ACID) as the primary abstraction being added (reproduced here): > At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit. > Further, since consumer progress is recorded as a write to the offsets topic, the above capability is leveraged to enable applications to batch consumed and produced messages into a single atomic unit, ie. a set of messages may be considered consumed only if the entire ‘consume-transform-produce’ executed in its entirety. I think it's important to say that to a user, "writes" really means "send() and commitOffsets() calls", not literal produce requests to Kafka brokers, and "consume-transform-produce" really means "poll(), transform, send()". This is because to a user, the implementation within poll() and send() and the broker are none of their concern, and are intended to be within the abstraction. When I say that this feature is a non-atomic commit, I mean that this feature does not fit the above description, and breaks the transaction abstraction in a meaningful way. No longer do all writes succeed or fail as a unit, some failures are permitted to drop data. No longer must a consume-transform-produce cycle be executed in its entirety, some parts may be left incomplete. This means that this method will be difficult to define ("which exceptions are covered?"), difficult to document ("how do we explain 'not-really-atomic commits' clearly and unambiguously to a potential user?"), and difficult to compose ("if someone turns this option on, how does that affect delivery guarantees and opportunities for bugs in upper layers?"). Users currently rely heavily on analogies to other database systems to make sense of Kafka's transactions, and we need to use that to our benefit, rather than designing in spite of it being true. However this atomicity guarantee isn't always desirable, as evidenced by the original bug report [2]. If you're interacting with a website form for example, and a database transaction fails because one of your strings is oversize, you don't need to re-input all of your form responses from scratch, as there is an application layer/browser in-between to preserve the state and retry the transaction. And while you could make a convenience/performance/etc argument in that situation ("The database should truncate/null-out the oversize string") and modern databases often have very expressive DML that would permit such a behavior (try-catch, etc), the End-to-End arguments [3] make me believe that is a bad design and should be discouraged. To that end, I was suggesting ways to push this farther and farther up the stack, such as performing record size estimation. This doesn't mean that it can't be added at a low level of abstraction, just that we need to make sure to exhaust all other alternatives, and justify it with a performance benefit. I was holding off on discussing the literal design until you provided concrete performance justification, but to progress the discussion while i'm waiting for that, I can give my thoughts: I don't think an overloaded send() method is appropriate given that this appears to be a niche use-case, and the send() method itself is probably the single most important method in the Clients library. The KIP-98 design was a much more substantial change to the Producer than this KIP, and it found a way to preserve the original type signature (but added an exception). Users picking up the Producer for the first time may see this additional method, and may spend time trying to understand whether it is something suitable for their use-case. In the best case, they ignore it and use the other two signatures. But it is also possible that they will use