Hey there,
We are having a Samza job running in YARN which reads from Kafka and writes
to Cassandra. We are also enabling the job to write the metrics to a Kafka
topic, the changelogs for the RocksDB stores and checkpointing of offsets
are also written to Kafka.
We are having failed containers in YARN for this job. The job is having a
total of 4 containers but one container fails and the newly allocated
containers by YARN keep failing.
Here is the stack trace from one of the failed containers:
2016-11-18 22:01:10,930 62058 [main] ERROR
o.a.s.s.kafka.KafkaSystemProducer - Unable to send message from
TaskName-Partition 6 to system kafka
2016-11-18 22:01:10,943 62071 [main] ERROR
o.a.samza.container.SamzaContainer - Caught exception in process loop.
org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 6 to system kafka.
at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152)
at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at
org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
at
org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:136)
at
org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
at
org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at
org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
at
org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:70)
at
org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182)
at
org.apache.samza.container.RunLoop$$anonfun$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162)
at
org.apache.samza.container.RunLoop$$anonfun$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at
org.apache.samza.container.RunLoop$$anonfun$commit$1.apply$mcVJ$sp(RunLoop.scala:162)
at
org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51)
at
org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:35)
at org.apache.samza.container.RunLoop.commit(RunLoop.scala:157)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:76)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
request included a message larger than the max message size the server will
accept.
We tried turning off the metrics to be written to Kafka but still having
the problem. We enabled the 'trace' level debugging for the
'org.apache.samza' package and found following additional info:
2016-11-18 22:01:10,323 61451 [main] INFO o.a.k.c.producer.ProducerConfig
- ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
acks = all
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [usw2a-daalt-an-kaf-int5.prsn.us:9092,
usw2b-daalt-an-kaf-int5.prsn.us:9092, usw2c-daalt-an-kaf-int5.prsn.us:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
retries = 2147483647
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 1
metrics.num.samples = 2
linger.ms = 0
client.id =
samza_checkpoint_manager-course_section_analytics-1-1479506410048-6
Which is compelling us to believe that it is still producing the metrics in
spite of us disabling the metrics writing to Kafka.
Any help from you guys would be much appreciated. Have a nice weekend!
Thanks,
Suraj Choudhary