Hi, I wanna resurface this thread because I'm still facing issues with our samza not receiving events.
Our samza job metric "SamzaContainerMetrics.process-calls" dropped to zero today again. So does "SamzaContainerMetrics.process-envelopes" (of course). Current topic offset and task checkpoint revealed that everything looks good: Topic partition 18 offset (as of now) = *488986* Current checkpoint for taskname Partition 18: tasknames.Partition 18.systems.kafka.streams.nogoalids.partitions.18 = *474222* Even after redeployment of the job, everything still seemed stuck :( Any ideas that could help me debug this will be appreciated. On Wed, Mar 16, 2016 at 4:19 PM, David Yu <david...@optimizely.com> wrote: > No, instead, I updated the checkpoint topic with the "upcoming" offsets. > (I should have done a check before that though). > > So a related question: if I delete the checkpoint topic from Kafka, that > would essentially clear up all the offset info and samza will be able to > recreate this topic with the latest offsets (e.g. smallest). Is that > correct? Just wanna find an easy way to do a "reprocess all" kind of > operation. > > Thanks. > > On Wed, Mar 16, 2016 at 3:25 PM, Navina Ramesh < > nram...@linkedin.com.invalid> wrote: > >> Strange. I am unable to comment on the behavior because I don't know what >> your checkpoints looked like in the checkpoint topic. >> >> Did you try reading the checkpoint topic log ? >> >> If you setting systems.kafka.streams.nogoalids.samza.reset.offset = true, >> you are essentially ignoring checkpoints for that stream. Do verify that >> you are reading from the correct offset in the stream :) >> >> Thanks! >> Navina >> >> On Wed, Mar 16, 2016 at 3:16 PM, David Yu <david...@optimizely.com> >> wrote: >> >> > Finally seeing events flowing again. >> > >> > Yes, the "systems.kafka.consumer.auto.offset.reset" option is probably >> not >> > a factor here. And yes, I am using checkpointing (kafka). Not sure if >> the >> > offsets are messed up. But I was able to use >> > "systems.kafka.streams.nogoalids.samza.reset.offset=true" to reset the >> > offsets to the newest ones. After that, events started coming. Still, >> it is >> > unclear to me how things got stuck in the first place. >> > >> > On Wed, Mar 16, 2016 at 2:31 PM, Navina Ramesh >> > <nram...@linkedin.com.invalid >> > > wrote: >> > >> > > HI David, >> > > This configuration you have tweaked >> > > (systems.kafka.consumer.auto.offset.reset) is honored only when one of >> > the >> > > following condition holds: >> > > * topic doesn't exist >> > > * checkpoint is older than the maximum message history retained by the >> > > brokers >> > > >> > > So, my questions are : >> > > Are you using checkpointing? If you do, you can read the checkpoint >> topic >> > > to see the offset that is being used to fetch data. >> > > >> > > If you are not using checkpoints, then samza uses >> > > systems.kafka.samza.offset.default to decide whether to start reading >> > from >> > > the earliest (oldest data) or upcoming (newest data) offset in the >> > stream. >> > > >> > > This could explain from where your job is trying to consume and you >> can >> > > cross-check with the broker. >> > > For the purpose of debugging, you can print a debug line in process() >> > > method to print the offset of the message you are processing >> > > (message.getOffset). Please remember to remove the debug line after >> > > troubleshooting. Else you risk filling up your logs. >> > > >> > > Let me know if you have more questions. >> > > >> > > Thanks! >> > > Navina >> > > >> > > On Wed, Mar 16, 2016 at 2:12 PM, David Yu <david...@optimizely.com> >> > wrote: >> > > >> > > > I'm trying to debug our samza job, which seem to be stuck from >> > consuming >> > > > from our Kafka stream. >> > > > >> > > > Every time I redeploy the job, only the same handful of events get >> > > > consumed, and then no more events get processed. I manually checked >> to >> > > make >> > > > sure the input stream is live and flowing. I also tried both the >> > > following: >> > > > >> > > > systems.kafka.consumer.auto.offset.reset=largest >> > > > systems.kafka.consumer.auto.offset.reset=smallest >> > > > >> > > > I'm also seeing the following from the log: >> > > > >> > > > ... partitionMetadata={Partition >> > > > [partition=0]=SystemStreamPartitionMetadata [oldestOffset=144907, >> > > > newestOffset=202708, upcomingOffset=202709], Partition >> > > > [partition=5]=SystemStreamPartitionMetadata [oldestOffset=140618, >> > > > newestOffset=200521, upcomingOffset=200522], ... >> > > > >> > > > >> > > > Not sure what other ways I could diagnose this problem. Any >> suggestion >> > is >> > > > appreciated. >> > > > >> > > >> > > >> > > >> > > -- >> > > Navina R. >> > > >> > >> >> >> >> -- >> Navina R. >> > >