Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/2940#discussion_r19507777
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
---
@@ -0,0 +1,221 @@
+package org.apache.spark.streaming
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import akka.actor.{ActorSystem, Props}
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark._
+import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage._
+import org.apache.spark.streaming.util._
+import org.apache.spark.streaming.receiver._
+import org.apache.spark.util.AkkaUtils
+import WriteAheadLogBasedBlockHandler._
+import WriteAheadLogSuite._
+
+class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with
Matchers with Logging {
+
+ val conf = new
SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+ val hadoopConf = new Configuration()
+ val storageLevel = StorageLevel.MEMORY_ONLY_SER
+ val streamId = 1
+ val securityMgr = new SecurityManager(conf)
+ val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ val shuffleManager = new HashShuffleManager(conf)
+ val serializer = new KryoSerializer(conf)
+ val manualClock = new ManualClock
+ val blockManagerSize = 10000000
+
+ var actorSystem: ActorSystem = null
+ var blockManagerMaster: BlockManagerMaster = null
+ var blockManager: BlockManager = null
+ var receivedBlockHandler: ReceivedBlockHandler = null
+ var tempDirectory: File = null
+
+ before {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+ this.actorSystem = actorSystem
+ conf.set("spark.driver.port", boundPort.toString)
+
+ blockManagerMaster = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf,
new LiveListenerBus))),
+ conf, true)
+
+ blockManager = new BlockManager("bm", actorSystem, blockManagerMaster,
serializer,
+ blockManagerSize, conf, mapOutputTracker, shuffleManager,
+ new NioBlockTransferService(conf, securityMgr))
+
+ tempDirectory = Files.createTempDir()
+ manualClock.setTime(0)
+ }
+
+ after {
+ if (receivedBlockHandler != null) {
+ if
(receivedBlockHandler.isInstanceOf[WriteAheadLogBasedBlockHandler]) {
+
receivedBlockHandler.asInstanceOf[WriteAheadLogBasedBlockHandler].stop()
+ }
+ }
+ if (blockManager != null) {
+ blockManager.stop()
+ blockManager = null
+ }
+ if (blockManagerMaster != null) {
+ blockManagerMaster.stop()
+ blockManagerMaster = null
+ }
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ actorSystem = null
+
+ if (tempDirectory != null && tempDirectory.exists()) {
+ FileUtils.deleteDirectory(tempDirectory)
+ tempDirectory = null
+ }
+ }
+
+ test("BlockManagerBasedBlockHandler - store blocks") {
+ createBlockManagerBasedBlockHandler()
--- End diff --
That is most probably easier to read. Let me try that out. Thanks for the
idea!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]