Hi, we have some strange behavior with KafkaUtils DirectStream and the size of the MapPartitionsRDDs.
We use a permanent direct steam where we consume about 8.500 json messages/sec. The json messages are read, some information are extracted and the result of each json is a string which collect/group with reduceByKeyAndWindow. The windowLength and slideInterval are both 60 sec. The result of the window will be send back to another Kafka topic. The batch duration is 20 seconds. The RDDs are growing all the time and the data transfered to the executors is growing and growing. Since the RDDs are growing, the time to send them over the network and the time for processing is growing, too. We start with a processing time of 4 seconds. After 20 hours we reach 10 seconds. The processing time is growing and growing and on some point the processing time is permanently over 20 seconds (the batch duration time) and Spark will run OOM or get other problems. Has anybody any idea how to fix this? We are using Spark 1.5.2 and Kafka 0.8.2.2. We read two Kafka topics - one with 200 Kafka partitions and one with 20 Kafka partitions. Spark runs in standalone cluster mode with three instances on AWS. We use 3 worker nodes with at all 23 executors. We start the app with these parameters: --conf "spark.akka.frameSize=160" --conf "spark.cleaner.referenceTracking.blocking=true" --conf "spark.cleaner.referenceTracking.blocking.shuffle=true" --conf "spark.cleaner.referenceTracking.cleanCheckpoints=true" --conf "spark.cleaner.ttl=600" --conf "spark.default.parallelism=69" --conf "spark.executor.cores=1" --conf "spark.executor.memory=7g" --conf "spark.kryoserializer.buffer.max=256m" --conf "spark.rrd.compress=true" --conf "spark.storage.memoryFraction=0.2" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.streaming.kafka.maxRatePerPartition=75" --conf "spark.streaming.receiver.maxRate=15000" --conf "spark.streaming.stopGracefullyOnShutdown=true" --deploy-mode cluster --supervise Thanks, Uwe