That is definitely weird. spark.core.max should not affect thing when they
are running local mode.

And, I am trying to think of scenarios that could cause a broadcast
variable used in the current job to fall out of scope, but they all seem
very far fetched. So i am really curious to see the code where this could
be happening.

Either ways, you could turn off the behavior by using
spark.cleaner.referenceTracking=false

TD


On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote:

>  Hi, TD,
>
> I think I got more insights to the problem
>
> in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max,
> which is much larger than the expected value
>
> (I passed master address as local[6], and spark.core.max as 200)
>
> If I set a more consistent value, everything goes well,
>
> But I do not think it will bring this problem even the spark.cores.max is
> too large?
>
> Best,
>
> --
> Nan Zhu
>
> On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:
>
>  Hi, TD,
>
> Thanks for the reply
>
> I tried to reproduce this in a simpler program, but no luck
>
> However, the program has been very simple, just load some files from HDFS
> and write them to HBase….
>
> ---
>
> It seems that the issue only appears when I run the unit test in Jenkins
> (not fail every time, in usual, it will success in 1/10 times)
>
> I once suspected that it’s related to some concurrency issue, but even I
> disable the parallel test in built.sbt, the problem is still there
>
> ---
>
> Best,
>
> --
> Nan Zhu
>
> On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:
>
> The ContextCleaner cleans up data and metadata related to RDDs and
> broadcast variables, only when those variables are not in scope and get
> garbage-collected by the JVM. So if the broadcast variable in question is
> probably somehow going out of scope even before the job using the broadcast
> variable is in progress.
>
> Could you reproduce this behavior reliably in a simple code snippet that
> you can share with us?
>
> TD
>
>
>
> On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote:
>
>  Hi, all
>
> When I run some Spark application (actually unit test of the application in
> Jenkins ), I found that I always hit the FileNotFoundException when
> reading broadcast variable
>
> The program itself works well, except the unit test
>
> Here is the example log:
>
>
> 14/07/21 19:49:13 INFO Executor: Running task ID 4
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
> (progress: 3/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
> hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO Executor: Finished task ID 3
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
> in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 5
> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
> 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
> (progress: 4/106)
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
> 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
> memory (free 886623436)*
> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
> 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
> 14/07/21 
> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> 
> 19:49:13 INFO HadoopRDD: Input split: 
> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
> 14/07/21 
> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> 
> 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 4
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes 
> in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 6
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost 
> (progress: 5/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
> hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 5
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on 
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes 
> in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 7
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost 
> (progress: 6/106)
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6
> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0
>       at 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624)
>       at 
> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
>       at 
> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
>       at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>       at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>       at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>       at 
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
>       at 
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
>       at 
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
>
>
> I highlighted the lines indicating the ContextCleaner cleaned the broadcast 
> variable, I’m wondering why the variable is cleaned, since there are enough 
> memory space?
>
>
> Best,
>
>
> --
> Nan Zhu
>
>
>
>
>

Reply via email to