Hi,

I’m not sure if I’m confusing the concepts here, because I don’t understand 
your answer? 

I thought that the reason for making a PartitionOffsetRequest was to determine 
which offset to use when fetching messages? Also, how is message.max.bytes 
related?

I’m using the simple consumer api, and I'm at the point where I want to start 
consuming a partition, and I want to determine which is the latest offset (I’m 
not taking into consideration what my consumer has consumed previously, I just 
want to start at the tail of the partition) before I make my first message 
fetch request.


/Magnus

On 16 Oct 2014, at 19:11, Neha Narkhede <neha.narkh...@gmail.com> wrote:

> Do you see any errors on the broker?
> Are you sure that the consumer's fetch offset is set higher than the
> largest message in your topic? It should be higher than message.max.bytes
> on the broker (which defaults to 1MB).
> 
> On Thu, Oct 16, 2014 at 3:56 AM, Magnus Vojbacke <
> magnus.vojba...@digitalroute.com> wrote:
> 
>> Hi,
>> 
>> I’m trying to make a request for offset information from my broker, and I
>> get a kafka.common.UnknownException as the result.
>> 
>> I’m trying to use the Simple Consumer API
>> 
>> 
>> 
>>        val topicAndPartition = new TopicAndPartition(“topic3”, 0)
>>        val requestInfo = new java.util.HashMap[TopicAndPartition,
>> PartitionOffsetRequestInfo]()
>>        requestInfo.put(topicAndPartition, new
>> PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1))
>> 
>>        val request = new kafka.javaapi.OffsetRequest(requestInfo,
>> kafka.api.OffsetRequest.CurrentVersion, clientName)
>> 
>>        import kafka.javaapi._
>>        // conn: kafka.javaapi.consumer.SimpleConsumer
>>        val response: OffsetResponse = conn.getOffsetsBefore(request)
>> 
>>        println("got response [" + response + “]")
>> 
>> 
>> 
>> Output:
>> got response [OffsetResponse(0,Map([test3,1] -> error:
>> kafka.common.UnknownException offsets: 0))]
>> 
>> 
>> I really can’t figure out why I’m getting this response. As far as I know,
>> “topic3” with partition “0” exists on the broker, and I can use
>> bin/kafka-console-consumer.sh to consume from it without any problems.
>> 
>> 
>> Is there any idea of what could cause this exception?
>> 
>> As it is right now, I’m not even sure if the request gets to the broker.
>> Is there any way of activating more verbose logs on the broker?
>> 
>> I think I’m using a trunk build (2.10-0.8.3-SNAPSHOT)
>> 
>> 
>> BR
>> /Magnus
>> 
>> 

Reply via email to