It must have been junk data. I started using a new topic for everything (metrics/source/dest) and data is flowing fine now. When I switch it back to the other topics I get the hung behavior with nothing erroring out in the logs.
On Wed, Jun 3, 2015 at 10:10 AM, Garrett Barton <garrett.bar...@gmail.com> wrote: > Thanks all for the responses. > As you can probably tell from the job name I am doing a validation process > which is taking in json and testing for a ton of things. This is why I am > using String serde's instead of Samza's built in json support, the json > could be malformed and I need to still operate on it. My serde issues were > from the malformed json being sent around, switching to StringSerde solved > this. I have not seen that behavior in SAMZA-608 yet. > > here is my current config, I started with the hello-samza wiki stat config: > > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactoryjob.name=validate-records > > # YARN > yarn.package.path=hdfs://p934/user/bartong/realtime-validator.tar.gz > > # Task > task.class=samza.RealtimeValidator > task.inputs=kafka.sample-raw-streamtask.window.ms=60000 > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > task.checkpoint.system=kafka > # Normally, this would be 3, but we have only one broker. > task.checkpoint.replication.factor=1 > > # Serializers > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > serializers.registry.long.class=org.apache.samza.serializers.LongSerdeFactory > serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory > > # Systems > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.key.serde=string > systems.kafka.samza.msg.serde=string > systems.kafka.streams.metrics-samza.samza.msg.serde=metrics > systems.kafka.consumer.zookeeper.connect=zk1:2181/ > systems.kafka.consumer.auto.offset.reset=smallest > systems.kafka.producer.bootstrap.servers=ka1:6667 > > # Key-value storage > stores.validate-records.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory > stores.validate-records.changelog=kafka.validate-records-changelog > stores.validate-records.key.serde=string > stores.validate-records.msg.serde=long > > # Normally, we'd leave this alone, but we have only one broker. > stores.validate-records.changelog.replication.factor=1 > > # Normally, we'd set this much higher, but we want things to look snappy in > the demo. > stores.validate-records.write.batch.size=0 > stores.validate-records.object.cache.size=0 > > > > Some things I've noticed while testing: > setting auto.offset.reset=smallest is not taking me back to the start of > the streams. Its like its being ignored. I also don't see consumer > properties outputting to the logs, I do see producer props though. > > On Tue, Jun 2, 2015 at 8:24 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Garret, >> >> Regarding the serde issues that Yan mentioned, you can check >> https://issues.apache.org/jira/browse/SAMZA-608 and see if its >> description >> matches what you saw. >> >> Guozhang >> >> On Tue, Jun 2, 2015 at 4:56 PM, Yan Fang <yanfang...@gmail.com> wrote: >> >> > Hi Garrett, >> > >> > I guess you run into the serde issues as you mentioned. If you can check >> > the Samza log and show us, we will be more helpful. Also, maybe pasting >> the >> > config here (if you dont mind), we can help to see if you miss >> something. >> > >> > Thanks, >> > >> > Fang, Yan >> > yanfang...@gmail.com >> > >> > On Tue, Jun 2, 2015 at 3:01 PM, Garrett Barton < >> garrett.bar...@gmail.com> >> > wrote: >> > >> > > Greetings all, >> > > >> > > >> > > I am trying to translate an existing workflow from MR into Samza. >> Thus >> > far >> > > everything is coded and kinks with deploying have been worked out. My >> > task >> > > deploys into yarn (2.6.0), consumes records from Kafka (0.8.2.1) fine, >> > but >> > > no data from metrics and my output streams are showing up in Kafka. I >> > see >> > > the metrics topic created in Kafka, but its empty (I have a counter >> > > counting records seen). >> > > >> > > I have debug prints showing me that I am calling collector.send() >> which >> > is >> > > also wrapped in a catch for Throwable. Nothing at all shows in the >> logs. >> > > >> > > I do see the checkpoint topic being used, and incremented >> appropriately. >> > > So between that and consuming records in the first place I think the >> > > system.kafka is configured correctly. I ran into serde issues with >> > > consumption and sending and fixed those too. >> > > >> > > Has anyone run into this kind of behavior? Am hoping its a dumb >> config >> > > issue. >> > > >> > > V/R, >> > > ~Garrett >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >