Hey there,

We are having a Samza job running in YARN which reads from Kafka and writes
to Cassandra. We are also enabling the job to write the metrics to a Kafka
topic, the changelogs for the RocksDB stores  and checkpointing of offsets
are also written to Kafka.

We are having failed containers in YARN for this job. The job is having a
total of 4 containers but one container fails and the newly allocated
containers by YARN keep failing.

Here is the stack trace from one of the failed containers:

2016-11-18 22:01:10,930 62058 [main] ERROR
o.a.s.s.kafka.KafkaSystemProducer - Unable to send message from
TaskName-Partition 6 to system kafka
 2016-11-18 22:01:10,943 62071 [main] ERROR
o.a.samza.container.SamzaContainer - Caught exception in process loop.
 org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 6 to system kafka.
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
        at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
        at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
        at
org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:136)
        at
org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
        at
org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at
org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64)
        at
org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:70)
        at
org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182)
        at
org.apache.samza.container.RunLoop$$anonfun$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162)
        at
org.apache.samza.container.RunLoop$$anonfun$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:162)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at
org.apache.samza.container.RunLoop$$anonfun$commit$1.apply$mcVJ$sp(RunLoop.scala:162)
        at
org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51)
        at
org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:35)
        at org.apache.samza.container.RunLoop.commit(RunLoop.scala:157)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:76)
        at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
        at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
        at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
        at
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
request included a message larger than the max message size the server will
accept.

We tried turning off the metrics to be written to Kafka but still having
the problem. We enabled the 'trace' level debugging for the
'org.apache.samza' package and found following additional info:

2016-11-18 22:01:10,323 61451 [main] INFO  o.a.k.c.producer.ProducerConfig
- ProducerConfig values:
        compression.type = none
        metric.reporters = []
        metadata.max.age.ms = 300000
        metadata.fetch.timeout.ms = 60000
        acks = all
        batch.size = 16384
        reconnect.backoff.ms = 10
        bootstrap.servers = [usw2a-daalt-an-kaf-int5.prsn.us:9092,
usw2b-daalt-an-kaf-int5.prsn.us:9092, usw2c-daalt-an-kaf-int5.prsn.us:9092]
        receive.buffer.bytes = 32768
        retry.backoff.ms = 100
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        retries = 2147483647
        max.request.size = 1048576
        block.on.buffer.full = true
        value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        metrics.sample.window.ms = 30000
        send.buffer.bytes = 131072
        max.in.flight.requests.per.connection = 1
        metrics.num.samples = 2
        linger.ms = 0
        client.id =
samza_checkpoint_manager-course_section_analytics-1-1479506410048-6


Which is compelling us to believe that it is still producing the metrics in
spite of us disabling the metrics writing to Kafka.

Any help from you guys would be much appreciated. Have a nice weekend!


Thanks,
Suraj Choudhary

Reply via email to