Hi all, If I am using a Custom Receiver with Storage Level set to StorageLevel. MEMORY_ONLY_SER_2 and the WAL enabled I get this Warning in the logs:
16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: Storage level replication 2 is unnecessary when write ahead log is enabled, change to replication 1 16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: User defined storage level StorageLevel(false, true, false, false, 2) is changed to effective storage level StorageLevel(false, true, false, false, 1) when write ahead log is enabled My application is running on 4 Executors with 4 cores each, and 1 Receiver. Because the data is not replicated the processing runs on only one Executor: [image: Inline images 1] Instead of 16 cores processing the Streaming data only 4 are being used. We cannot reparation the DStream to distribute data to more Executors since if you call reparation on an RDD which is only located on one node, the new partitions are only created on that node, which doesn't help. This theory that repartitioning doesn't help can be tested with this simple example, which tries to go from one partition on a single node to many on many nodes. What you find with when you look at the multiplePartitions RDD in the UI is that its 6 partitions are on the same Executor. scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 4).cache.setName("rdd") rdd: org.apache.spark.rdd.RDD[Int] = rdd ParallelCollectionRDD[0] at parallelize at <console>:27 scala> rdd.count() res0: Long = 6 scala> val singlePartition = rdd.repartition(1).cache.setName("singlePartition") singlePartition: org.apache.spark.rdd.RDD[Int] = singlePartition MapPartitionsRDD[4] at repartition at <console>:29 scala> singlePartition.count() res1: Long = 6 scala> val multiplePartitions = singlePartition.repartition(6).cache.setName("multiplePartitions") multiplePartitions: org.apache.spark.rdd.RDD[Int] = multiplePartitions MapPartitionsRDD[8] at repartition at <console>:31 scala> multiplePartitions.count() res2: Long = 6 Am I correct in the use of reparation, that the data does not get shuffled if it is all on one Executor? Shouldn't I be allowed to set the Receiver replication factor to two when the WAL is enabled so that multiple Executors can work on the Streaming input data? We will look into creating 4 Receivers so that the data gets distributed more evenly. But won't that "waste" 4 cores in our example, where one would do? Best regards, Patrick