[
https://issues.apache.org/jira/browse/KAFKA-14421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yws reassigned KAFKA-14421:
---------------------------
Assignee: yws
> OffsetFetchRequest throws NPE Exception
> ---------------------------------------
>
> Key: KAFKA-14421
> URL: https://issues.apache.org/jira/browse/KAFKA-14421
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
> Reporter: yws
> Assignee: yws
> Priority: Major
> Attachments: image-2022-11-27-22-28-52-165.png,
> image-2022-11-27-22-41-45-358.png
>
>
> when I use 0.10.2 client send Metadata request to 0.10.0 server, NPE
> exception happens,
> !image-2022-11-27-22-28-52-165.png!
> the NPE exception quite confused me, because if just send Metadata request
> doest not cause the NPE exception occurs, after troubleshooting the problem,
> It is the NetworkClient#poll call ConsumerNetworkClient#trySend and further
> call NetworkClient#doSend when trying to build OffsetFetchRequest, because
> the 0.10.0 server doest not support fetch all TopicPartitions, it throw
> UnsupportedVersionException,
> {code:java}
> private void doSend(ClientRequest clientRequest, boolean isInternalRequest,
> long now) {
> String nodeId = clientRequest.destination();
> ......
> AbstractRequest request = null;
> AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
> try {
> NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
> // Note: if versionInfo is null, we have no server version
> information. This would be
> // the case when sending the initial ApiVersionRequest which
> fetches the version
> // information itself. It is also the case when
> discoverBrokerVersions is set to false.
> if (versionInfo == null) {
> if (discoverBrokerVersions && log.isTraceEnabled())
> log.trace("No version information found when sending
> message of type {} to node {}. " +
> "Assuming version {}.", clientRequest.apiKey(),
> nodeId, builder.version());
> } else {
> short version =
> versionInfo.usableVersion(clientRequest.apiKey());
> builder.setVersion(version);
> }
> // The call to build may also throw UnsupportedVersionException,
> if there are essential
> // fields that cannot be represented in the chosen version.
> request = builder.build();
> } catch (UnsupportedVersionException e) {
> // If the version is not supported, skip sending the request over
> the wire.
> // Instead, simply add it to the local queue of aborted requests.
> log.debug("Version mismatch when attempting to send {} to {}",
> clientRequest.toString(), clientRequest.destination(), e);
> ClientResponse clientResponse = new
> ClientResponse(clientRequest.makeHeader(),
> clientRequest.callback(), clientRequest.destination(),
> now, now,
> false, e, null);
> abortedSends.add(clientResponse);
> return;
> }
> {code}
> !image-2022-11-27-22-41-45-358.png!
> until now, all are expected, but unfortunately, in catch
> UnsupportedVersionException code block, clientRequest.toString need to call
> requestBuilder#toString, that is OffsetFetchRequest's Builder#toString, when
> partition is ALL_TOPIC_PARTITIONS, it is null, therefore it cause the
> unexpected NPE, and make the normal MetadataRequest failed..
> {code:java}
> catch (UnsupportedVersionException e) {
>
> log.debug("Version mismatch when attempting to send {} to {}",
> clientRequest.toString(), clientRequest.destination(), e);
> ClientResponse clientResponse = new
> ClientResponse(clientRequest.makeHeader(),
> clientRequest.callback(), clientRequest.destination(),
> now, now,
> false, e, null);
> abortedSends.add(clientResponse);
> return;
> }
> ClientRequest#toString()
> public String toString() {
> return "ClientRequest(expectResponse=" + expectResponse +
> ", callback=" + callback +
> ", destination=" + destination +
> ", correlationId=" + correlationId +
> ", clientId=" + clientId +
> ", createdTimeMs=" + createdTimeMs +
> ", requestBuilder=" + requestBuilder +
> ")";
> }
> OffsetFetchRequest's Builder#toString
> public String toString() {
> StringBuilder bld = new StringBuilder();
> bld.append("(type=OffsetFetchRequest, ").
> append("groupId=").append(groupId).
> append(", partitions=").append(Utils.join(partitions,
> ",")). // cause NPE
> append(")");
> return bld.toString();
> }
> {code}
> I think the NPE is unexpected, when broker doest not support specific
> protocal, It should not throw NPE instead of UnsupportedVersionException,
> and I find in 0.11 or later version
> it is fixed, but the OffsetFetchRequest support ALL_TOPIC_PARTITIONS is
> introduced in 0.10.2
> [kIP88|https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update],
> Therefore, I think it should also be fixed in 0.10.2 client
> look forward to any reply , Thanks~
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)