Hi all,

If I am using a Custom Receiver with Storage Level set to StorageLevel.
MEMORY_ONLY_SER_2 and the WAL enabled I get this Warning in the logs:

16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: Storage level
replication 2 is unnecessary when write ahead log is enabled, change
to replication 1
16/04/13 14:03:15 WARN WriteAheadLogBasedBlockHandler: User defined
storage level StorageLevel(false, true, false, false, 2) is changed to
effective storage level StorageLevel(false, true, false, false, 1)
when write ahead log is enabled


My application is running on 4 Executors with 4 cores each, and 1
Receiver.  Because the data is not replicated the processing runs on only
one Executor:

[image: Inline images 1]

Instead of 16 cores processing the Streaming data only 4 are being used.

We cannot reparation the DStream to distribute data to more Executors since
if you call reparation on an RDD which is only located on one node, the new
partitions are only created on that node, which doesn't help.  This theory
that repartitioning doesn't help can be tested with this simple example,
which tries to go from one partition on a single node to many on many
nodes.  What you find with when you look at the multiplePartitions RDD in
the UI is that its 6 partitions are on the same Executor.

scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 4).cache.setName("rdd")
rdd: org.apache.spark.rdd.RDD[Int] = rdd ParallelCollectionRDD[0] at
parallelize at <console>:27

scala> rdd.count()
res0: Long = 6

scala> val singlePartition = rdd.repartition(1).cache.setName("singlePartition")
singlePartition: org.apache.spark.rdd.RDD[Int] = singlePartition
MapPartitionsRDD[4] at repartition at <console>:29

scala> singlePartition.count()
res1: Long = 6

scala> val multiplePartitions =
singlePartition.repartition(6).cache.setName("multiplePartitions")
multiplePartitions: org.apache.spark.rdd.RDD[Int] = multiplePartitions
MapPartitionsRDD[8] at repartition at <console>:31

scala> multiplePartitions.count()
res2: Long = 6

Am I correct in the use of reparation, that the data does not get
shuffled if it is all on one Executor?

Shouldn't I be allowed to set the Receiver replication factor to two
when the WAL is enabled so that multiple Executors can work on the
Streaming input data?

We will look into creating 4 Receivers so that the data gets distributed
more evenly.  But won't that "waste" 4 cores in our example, where one
would do?

Best regards,
Patrick

Reply via email to