Hi All,
I am getting the following exception while starting a samza job
2015-08-26 12:05:13 VerifiableProperties [INFO] Verifying properties
2015-08-26 12:05:13 VerifiableProperties [INFO] Property auto.offset.reset is
overridden to smallest
2015-08-26 12:05:13 VerifiableProperties [INFO] Property client.id is
overridden to samza_consumer-samza_parser-1-1440615913700-5
2015-08-26 12:05:13 VerifiableProperties [INFO] Property group.id is overridden
to undefined-samza-consumer-group-d005e5a6-7ee8-49ff-a6f7-782a1404402a
2015-08-26 12:05:13 VerifiableProperties [INFO] Property zookeeper.connect is
overridden to 10.25.106.183:2181/
2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers:
Map(samza-parser -> org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
2015-08-26 12:05:13 SamzaContainer$ [INFO] Got store consumers:
Map(samza-parser -> org.apache.samza.system.kafka.KafkaSystemConsumer@2913f73e)
2015-08-26 12:05:13 SamzaContainerExceptionHandler [ERROR] Uncaught exception
in thread (name=main). Exiting process now.
java.util.NoSuchElementException: key not found: string
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at
org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(SamzaContainer.scala:456)
at
org.apache.samza.container.SamzaContainer$$anonfun$37$$anonfun$40.apply(SamzaContainer.scala:448)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer.scala:448)
at
org.apache.samza.container.SamzaContainer$$anonfun$37.apply(SamzaContainer.scala:425)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:425)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Below is my samza-parser.properties file.
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=samza-parser
# YARN
yarn.package.path=file:///Documents/workspace/hello-samza/target/hello-samza-0.9.1-dist.tar.gz
# Task
task.class=samza.examples.wikipedia.task.Parser
task.inputs=kafka.samza_test
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
# Normally, this would be 3, but we have only one broker.
task.checkpoint.replication.factor=1
task.window.ms=5000
# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.streams.metrics.samza.msg.serde=metrics
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=smallest
systems.kafka.producer.bootstrap.servers=localhost:9092
# Key-value storage
stores.samza-parser.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.samza-parser.changelog=kafka.samza-parser-changelog
stores.samza-parser.key.serde=string
stores.samza-parser.msg.serde=integer
# Normally, we'd leave this alone, but we have only one broker.
stores.samza-parser.changelog.replication.factor=1
# Normally, we'd set this much higher, but we want things to look snappy in the
demo.
stores.samza-parser.write.batch.size=0
stores.samza-parser.object.cache.size=0
Can some one tell me where I am going wrong.
Regards,
Tushar Mhaskar