Hi,

we have some strange behavior with KafkaUtils DirectStream and the size
of the MapPartitionsRDDs.

We use a permanent direct steam where we consume about 8.500 json
messages/sec.
The json messages are read, some information are extracted and the
result of each json is a string which collect/group with
reduceByKeyAndWindow.

The windowLength and slideInterval are both 60 sec.
The result of the window will be send back to another Kafka topic.
The batch duration is 20 seconds.

The RDDs are growing all the time and the data transfered to the
executors is growing and growing.
Since the RDDs are growing, the time to send them over the network and
the time for processing is growing, too.

We start with a processing time of 4 seconds. After 20 hours we reach 10
seconds.
The processing time is growing and growing and on some point the
processing time is permanently over 20 seconds (the batch duration time)
and Spark will run OOM or get other problems.

Has anybody any idea how to fix this?

We are using Spark 1.5.2 and Kafka 0.8.2.2. We read two Kafka topics -
one with 200 Kafka partitions and one with 20 Kafka partitions.
Spark runs in standalone cluster mode with three instances on AWS.
We use 3 worker nodes with at all 23 executors.

We start the app with these parameters:
--conf "spark.akka.frameSize=160"
--conf "spark.cleaner.referenceTracking.blocking=true"
--conf "spark.cleaner.referenceTracking.blocking.shuffle=true"
--conf "spark.cleaner.referenceTracking.cleanCheckpoints=true"
--conf "spark.cleaner.ttl=600"
--conf "spark.default.parallelism=69"
--conf "spark.executor.cores=1"
--conf "spark.executor.memory=7g"
--conf "spark.kryoserializer.buffer.max=256m"
--conf "spark.rrd.compress=true"
--conf "spark.storage.memoryFraction=0.2"
--conf "spark.streaming.backpressure.enabled=true"
--conf "spark.streaming.kafka.maxRatePerPartition=75"
--conf "spark.streaming.receiver.maxRate=15000"
--conf "spark.streaming.stopGracefullyOnShutdown=true"
--deploy-mode cluster
--supervise

Thanks,
Uwe

Reply via email to