[
https://issues.apache.org/jira/browse/KAFKA-2845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012465#comment-15012465
]
Geoff Anderson edited comment on KAFKA-2845 at 11/19/15 12:34 AM:
------------------------------------------------------------------
[~gwenshap] That is the behavior we expected... but our discovery when adding
these tests was that the 0.8.X brokers *don't* close the connection for at
least some requests, and the error occurs on the client side when it tries to
parse the response.
I looked at NetworkClient.handleCompletedRecieves in 0.8.2.2, and maybe I'm
missing something, but I don't see what appears to be a version check there? It
seems like ProtoUtils.currentResponseSchema just finds the latest version for
the given apiKey?
{code:title=NetworkClient.java(0.8.2.2)}
private void handleCompletedReceives(List<ClientResponse> responses, long
now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
int source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
short apiKey = req.request().header().apiKey();
Struct body = (Struct)
ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request().header(), header);
if (apiKey == ApiKeys.METADATA.id) {
handleMetadataResponse(req.request().header(), body, now);
} else {
// need to add body/header to response here
responses.add(new ClientResponse(req, now, false, body));
}
}
}
{code}
{code:title=ProtoUtils.java(0.8.2.2)}
public static Schema currentResponseSchema(int apiKey) {
return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
}
{code}
was (Author: granders):
[~gwenshap] That is the behavior we expected... but our discovery when adding
these tests was that the 0.8.X brokers *don't* close the connection for at
least some requests, and the error occurs on the client side when it tries to
parse the response.
I looked at NetworkClient.handleCompletedRecieves in 0.8.2.2, and maybe I'm
missing something, but I don't see what appears to be a version check there? It
seems like ProtoUtils.currentResponseSchema just finds the latest version for
the given apiKey?
{code:title=NetworkClient.java}
private void handleCompletedReceives(List<ClientResponse> responses, long
now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
int source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
short apiKey = req.request().header().apiKey();
Struct body = (Struct)
ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request().header(), header);
if (apiKey == ApiKeys.METADATA.id) {
handleMetadataResponse(req.request().header(), body, now);
} else {
// need to add body/header to response here
responses.add(new ClientResponse(req, now, false, body));
}
}
}
{code}
{code:title=ProtoUtils.java}
public static Schema currentResponseSchema(int apiKey) {
return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey));
}
{code}
> Add 0.9 clients vs 0.8 brokers compatibility test
> -------------------------------------------------
>
> Key: KAFKA-2845
> URL: https://issues.apache.org/jira/browse/KAFKA-2845
> Project: Kafka
> Issue Type: Task
> Reporter: Geoff Anderson
> Assignee: Geoff Anderson
>
> Add a simple test or two to document and understand what behavior to expect
> if users try to run 0.9 java producer or 0.9 scala consumer ("old consumer")
> against an 0.8.X broker cluster
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)