[ https://issues.apache.org/jira/browse/KAFKA-2845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15012465#comment-15012465 ]
Geoff Anderson edited comment on KAFKA-2845 at 11/19/15 12:34 AM: ------------------------------------------------------------------ [~gwenshap] That is the behavior we expected... but our discovery when adding these tests was that the 0.8.X brokers *don't* close the connection for at least some requests, and the error occurs on the client side when it tries to parse the response. I looked at NetworkClient.handleCompletedRecieves in 0.8.2.2, and maybe I'm missing something, but I don't see what appears to be a version check there? It seems like ProtoUtils.currentResponseSchema just finds the latest version for the given apiKey? {code:title=NetworkClient.java(0.8.2.2)} private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { int source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); short apiKey = req.request().header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request().header(), header); if (apiKey == ApiKeys.METADATA.id) { handleMetadataResponse(req.request().header(), body, now); } else { // need to add body/header to response here responses.add(new ClientResponse(req, now, false, body)); } } } {code} {code:title=ProtoUtils.java(0.8.2.2)} public static Schema currentResponseSchema(int apiKey) { return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey)); } {code} was (Author: granders): [~gwenshap] That is the behavior we expected... but our discovery when adding these tests was that the 0.8.X brokers *don't* close the connection for at least some requests, and the error occurs on the client side when it tries to parse the response. I looked at NetworkClient.handleCompletedRecieves in 0.8.2.2, and maybe I'm missing something, but I don't see what appears to be a version check there? It seems like ProtoUtils.currentResponseSchema just finds the latest version for the given apiKey? {code:title=NetworkClient.java} private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { int source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); short apiKey = req.request().header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request().header(), header); if (apiKey == ApiKeys.METADATA.id) { handleMetadataResponse(req.request().header(), body, now); } else { // need to add body/header to response here responses.add(new ClientResponse(req, now, false, body)); } } } {code} {code:title=ProtoUtils.java} public static Schema currentResponseSchema(int apiKey) { return schemaFor(Protocol.RESPONSES, apiKey, latestVersion(apiKey)); } {code} > Add 0.9 clients vs 0.8 brokers compatibility test > ------------------------------------------------- > > Key: KAFKA-2845 > URL: https://issues.apache.org/jira/browse/KAFKA-2845 > Project: Kafka > Issue Type: Task > Reporter: Geoff Anderson > Assignee: Geoff Anderson > > Add a simple test or two to document and understand what behavior to expect > if users try to run 0.9 java producer or 0.9 scala consumer ("old consumer") > against an 0.8.X broker cluster -- This message was sent by Atlassian JIRA (v6.3.4#6332)