[ 
https://issues.apache.org/jira/browse/KAFKA-20624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy resolved KAFKA-20624.
------------------------------------
    Resolution: Fixed

> Wire topology description plugin into GroupCoordinatorService — read path 
> (describe) and broker wiring
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20624
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20624
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Alieh Saeedi
>            Assignee: TengYao Chi
>            Priority: Major
>             Fix For: 4.4.0
>
>
> streamsGroupDescribe gains includeTopologyDescription flag. 
> attachTopologyDescriptions: call plugin.getTopology only when 
> StoredTopologyEpoch == currentTopologyEpoch, otherwise report NOT_STORED. 
> Status constants: NOT_REQUESTED (0), NOT_STORED (1), ERROR (2), AVAILABLE 
> (3). GroupCoordinator interface update. Broker wiring: KafkaApis.scala (new 
> handler + describe flag plumbing), BrokerServer.scala (plugin instantiation + 
> configuration), RequestConvertToJson. GroupCoordinatorServiceTest 
> (describe-path tests), KafkaApisTest, RequestQuotaTest, 
> AuthorizerIntegrationTest, checkstyle/suppressions.xml.
> Wire the streams-group topology description plugin into broker-side request
> handling, completing the KAFKA-20618 broker work that KAFKA-20620 (schema +
> stubs) and KAFKA-20623 (coordinator-side write/heartbeat/delete) left for
> this ticket.
> Two paths are wired:
> 1. Describe path (read). streamsGroupDescribe is extended with an
>    IncludeTopologyDescription request flag. When the flag is set and the
>    persisted StoredDescriptionTopologyEpoch matches the group's current
>    topology epoch, a new StreamsGroupTopologyDescriptionManager.
>    attachTopologyDescriptions building block calls
>    plugin.getTopology(groupId, topologyEpoch) per group and populates the
>    response's topology field. Each DescribedGroup carries a
>    topologyDescriptionStatus byte: NOT_REQUESTED (0) when the client did
>    not ask for it, NOT_STORED (1) when the plugin has no row at the
>    requested epoch (or the epoch mismatched), ERROR (2) when the plugin
>    call failed, AVAILABLE (3) when the topology is attached. Chain
>    assembly lives on GroupCoordinatorService.streamsGroupDescribe; the
>    manager exposes per-group plugin invocation as a building block
>    (mirroring invokeSetTopology / invokeDeleteTopologies).
> 2. Update path (write — KafkaApis wiring).
>    KafkaApis.handleStreamsGroupTopologyDescriptionUpdate currently lives as
>    a placeholder stub that always returns UNSUPPORTED_VERSION (scaffolding
>    from KAFKA-20620). Replace it with the real handler: gate on
>    isStreamsGroupProtocolEnabled (returns UNSUPPORTED_VERSION when the
>    streams group protocol is disabled), authorize READ on GROUP for the
>    request's groupId (returns GROUP_AUTHORIZATION_FAILED if denied — per
>    KIP-1331, "like offset commits, we don't consider this a modification
>    of the GROUP, which allows deploying apps with READ ACLs"), then call
>    groupCoordinator.streamsGroupTopologyDescriptionUpdate(...) and send
>    the result via requestHelper.sendMaybeThrottle. Same shape as the
>    sibling handleStreamsGroupHeartbeat. The coordinator-side method was
>    added in KAFKA-20623 split 2 (#22552); only the KafkaApis side is
>    missing — flagged on the split 2 review.
> Broker wiring (shared). BrokerServer.scala instantiates the plugin via
> config.getConfiguredInstance(...) and threads it into the
> GroupCoordinatorService builder (verify the describe path doesn't need
> additional wiring beyond what KAFKA-20623 already added).
> RequestConvertToJson adds entries for StreamsGroupTopologyDescriptionUpdate
> Request/Response and for the new topology-description fields on the
> describe response so request/response logging tools can render them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to