Magnus Vojbacke created KAFKA-1779:
--------------------------------------
Summary: FetchRequest.maxWait has no effect
Key: KAFKA-1779
URL: https://issues.apache.org/jira/browse/KAFKA-1779
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 0.8.2
Reporter: Magnus Vojbacke
Priority: Minor
Fix For: 0.8.2
Setting the maxWait field in a kafka.api.FetchRequest does not appear to have
any effect. Whereas my assumption is: If I send a fetch request for messages
after offset X for a partition where there are currently no messages with
offsets after X, I would expect that a Fetch request built with the maxWait
option should block on the broker side for $maxWait milliseconds for a new
message to arrive.
Currently, the request seems to return an empty result immediately. As a
result, our client is forced to manually sleep on each fetch request that
returns empty results.
On the mailing list, it was stated that this bug should be fixed in 0.8.2, but
I'm still seeing this issue on 0.8.2-beta.
{code}
// Chose a suitable topic / partition / lead broker combination
val host = ???
val port = ???
val topic = ???
val partition = ???
val cons = new SimpleConsumer(host, port, 50000000, 50000000, "my-id")
val topicAndPartition = new TopicAndPartition(topic, partition)
println(topicAndPartition)
val requestInfo = Map(topicAndPartition -> new
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime, 1))
println(requestInfo)
val request = new OffsetRequest(requestInfo)
println(request)
val response: OffsetResponse = cons.getOffsetsBefore(request)
println("code=" +
response.partitionErrorAndOffsets(topicAndPartition).error)
println(response)
val offset = response.partitionErrorAndOffsets(topicAndPartition).offsets(0)
val req = new FetchRequestBuilder().clientId("my-id").addFetch(topic,
partition, offset, 500000).maxWait(10000)
// The following requests appear to return within a few hundred milliseconds of
// eachother, but my assumption is that maxWait 10000 milliseconds should
// make each request block for at least 10000 milliseconds before returning an
// empty result.
println(System.currentTimeMillis + " " + cons.fetch(req.build()))
println(System.currentTimeMillis + " " + cons.fetch(req.build()))
println(System.currentTimeMillis + " " + cons.fetch(req.build()))
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)