Magnus Vojbacke created KAFKA-1779: -------------------------------------- Summary: FetchRequest.maxWait has no effect Key: KAFKA-1779 URL: https://issues.apache.org/jira/browse/KAFKA-1779 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2 Reporter: Magnus Vojbacke Priority: Minor Fix For: 0.8.2
Setting the maxWait field in a kafka.api.FetchRequest does not appear to have any effect. Whereas my assumption is: If I send a fetch request for messages after offset X for a partition where there are currently no messages with offsets after X, I would expect that a Fetch request built with the maxWait option should block on the broker side for $maxWait milliseconds for a new message to arrive. Currently, the request seems to return an empty result immediately. As a result, our client is forced to manually sleep on each fetch request that returns empty results. On the mailing list, it was stated that this bug should be fixed in 0.8.2, but I'm still seeing this issue on 0.8.2-beta. {code} // Chose a suitable topic / partition / lead broker combination val host = ??? val port = ??? val topic = ??? val partition = ??? val cons = new SimpleConsumer(host, port, 50000000, 50000000, "my-id") val topicAndPartition = new TopicAndPartition(topic, partition) println(topicAndPartition) val requestInfo = Map(topicAndPartition -> new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime, 1)) println(requestInfo) val request = new OffsetRequest(requestInfo) println(request) val response: OffsetResponse = cons.getOffsetsBefore(request) println("code=" + response.partitionErrorAndOffsets(topicAndPartition).error) println(response) val offset = response.partitionErrorAndOffsets(topicAndPartition).offsets(0) val req = new FetchRequestBuilder().clientId("my-id").addFetch(topic, partition, offset, 500000).maxWait(10000) // The following requests appear to return within a few hundred milliseconds of // eachother, but my assumption is that maxWait 10000 milliseconds should // make each request block for at least 10000 milliseconds before returning an // empty result. println(System.currentTimeMillis + " " + cons.fetch(req.build())) println(System.currentTimeMillis + " " + cons.fetch(req.build())) println(System.currentTimeMillis + " " + cons.fetch(req.build())) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)