HI David, This configuration you have tweaked (systems.kafka.consumer.auto.offset.reset) is honored only when one of the following condition holds: * topic doesn't exist * checkpoint is older than the maximum message history retained by the brokers
So, my questions are : Are you using checkpointing? If you do, you can read the checkpoint topic to see the offset that is being used to fetch data. If you are not using checkpoints, then samza uses systems.kafka.samza.offset.default to decide whether to start reading from the earliest (oldest data) or upcoming (newest data) offset in the stream. This could explain from where your job is trying to consume and you can cross-check with the broker. For the purpose of debugging, you can print a debug line in process() method to print the offset of the message you are processing (message.getOffset). Please remember to remove the debug line after troubleshooting. Else you risk filling up your logs. Let me know if you have more questions. Thanks! Navina On Wed, Mar 16, 2016 at 2:12 PM, David Yu <david...@optimizely.com> wrote: > I'm trying to debug our samza job, which seem to be stuck from consuming > from our Kafka stream. > > Every time I redeploy the job, only the same handful of events get > consumed, and then no more events get processed. I manually checked to make > sure the input stream is live and flowing. I also tried both the following: > > systems.kafka.consumer.auto.offset.reset=largest > systems.kafka.consumer.auto.offset.reset=smallest > > I'm also seeing the following from the log: > > ... partitionMetadata={Partition > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=144907, > newestOffset=202708, upcomingOffset=202709], Partition > [partition=5]=SystemStreamPartitionMetadata [oldestOffset=140618, > newestOffset=200521, upcomingOffset=200522], ... > > > Not sure what other ways I could diagnose this problem. Any suggestion is > appreciated. > -- Navina R.