[ 
https://issues.apache.org/jira/browse/SPARK-17624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15596484#comment-15596484
 ] 

Tathagata Das commented on SPARK-17624:
---------------------------------------

I figured out the problem. Submitting a patch. 

[~aroberts] To answer your question, the StateStore maintenance thread (in 
executor) communicates with the StateStoreCoordinator (in the driver) to 
identify whether a state store should be unloaded or not. When the SparkContext 
is stopped, the StateStore is supposed to not be able to talk to the 
StateStoreCoordinator and unload all loaded StateStores. By default, the RPC 
layer will attempt 3 times with large timeouts to talk to the 
StateStoreCoordinator before failing. I set the numRetries to 1 to fail faster 
in the "maintenance" test.

> Flaky test? StateStoreSuite maintenance
> ---------------------------------------
>
>                 Key: SPARK-17624
>                 URL: https://issues.apache.org/jira/browse/SPARK-17624
>             Project: Spark
>          Issue Type: Test
>          Components: Tests
>    Affects Versions: 2.0.1, 2.1.0
>            Reporter: Adam Roberts
>            Priority: Minor
>
> I've noticed this test failing consistently (25x in a row) with a two core 
> machine but not on an eight core machine
> If we increase the spark.rpc.numRetries value used in the test from 1 to 2 (3 
> being the default in Spark), the test reliably passes, we can also gain 
> reliability by setting the master to be anything other than just local.
> Is there a reason spark.rpc.numRetries is set to be 1?
> I see this failure is also mentioned here so it's been flaky for a while 
> http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-2-0-0-RC5-td18367.html
> If we run without the "quietly" code so we get debug info:
> {code}
> 16:26:15.213 WARN org.apache.spark.rpc.netty.NettyRpcEndpointRef: Error 
> sending message [message = 
> VerifyIfInstanceActive(StateStoreId(/home/aroberts/Spark-DK/sql/core/target/tmp/spark-cc44f5fa-b675-426f-9440-76785c365507/ૺꎖ鮎衲넅-28e9196f-8b2d-43ba-8421-44a5c5e98ceb,0,0),driver)]
>  in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>         at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:91)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
>         at scala.Option.map(Option.scala:146)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStore.scala:227)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:199)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:197)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance(StateStore.scala:197)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anon$1.run(StateStore.scala:180)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.lang.Thread.run(Thread.java:785)
> Caused by: org.apache.spark.SparkException: Could not find 
> StateStoreCoordinator.
>         at 
> org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)
>         at 
> org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:129)
>         at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:225)
>         at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>         ... 19 more
> 16:26:15.217 WARN org.apache.spark.sql.execution.streaming.state.StateStore: 
> Error managing StateStore[id = (op=0, part=0), dir = 
> /home/aroberts/Spark-DK/sql/core/target/tmp/spark-cc44f5fa-b675-426f-9440-76785c365507/ૺꎖ鮎衲넅-28e9196f-8b2d-43ba-8421-44a5c5e98ceb/0/0],
>  stopping management thread
> - maintenance *** FAILED ***
>   The code passed to eventually never returned normally. Attempted 636 times 
> over 10.009220005000001 seconds. Last failure message: 
> StateStoreSuite.this.fileExists(provider, 1L, false) was true earliest file 
> not deleted. (StateStoreSuite.scala:396)
> Run completed in 19 seconds, 181 milliseconds.
> Total number of tests run: 9
> Suites: completed 2, aborted 0
> Tests: succeeded 8, failed 1, canceled 0, ignored 0, pending 0
> *** 1 TEST FAILED ***
> {code}
> With local[2] (presumably what .setMaster local resolves to on a two core 
> box?) the test reliably passes and this is also the case for local[1] and 
> local[4], for example:
> {code}
> StateStoreSuite:
> 16:17:34.379 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
> native-hadoop library for your platform... using builtin-java classes where 
> applicable
> - get, put, remove, commit, and all data iterator
> - updates iterator with all combos of updates and removes
> - cancel
> - getStore with unexpected versions
> - snapshotting
> - cleaning
> - corrupted file handling
> - StateStore.get
> In the maint test
> In eventually timeout 10
> Done assert 1
> Now doing assert 2
> Done assert 2
> 16:17:38.207 WARN org.apache.spark.rpc.netty.NettyRpcEndpointRef: Error 
> sending message [message = 
> VerifyIfInstanceActive(StateStoreId(/home/aroberts/Spark-DK/sql/core/target/tmp/spark-24d82fd9-252e-444c-8951-c4e3dadd854f/닟述뮰⮤ꂟ-4a0e3477-59ca-4444-9181-8937e5f66d0c,0,0),driver)]
>  in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>         at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>         at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:91)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$3.apply(StateStore.scala:227)
>         at scala.Option.map(Option.scala:146)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStore.scala:227)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:199)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anonfun$org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance$2.apply(StateStore.scala:197)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance(StateStore.scala:197)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$$anon$1.run(StateStore.scala:180)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:319)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:191)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.lang.Thread.run(Thread.java:785)
> Caused by: org.apache.spark.SparkException: Could not find 
> StateStoreCoordinator.
>         at 
> org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:154)
>         at 
> org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:129)
>         at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:225)
>         at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>         at 
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>         ... 19 more
> 16:17:38.209 WARN org.apache.spark.sql.execution.streaming.state.StateStore: 
> Error managing StateStore[id = (op=0, part=0), dir = 
> /home/aroberts/Spark-DK/sql/core/target/tmp/spark-24d82fd9-252e-444c-8951-c4e3dadd854f/닟述뮰⮤ꂟ-4a0e3477-59ca-4444-9181-8937e5f66d0c/0/0],
>  stopping management thread
> - maintenance
> Run completed in 9 seconds, 317 milliseconds.
> Total number of tests run: 9
> Suites: completed 2, aborted 0
> Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0
> All tests passed.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to