Hi,

I have a Spark Streaming application that reads messages from Kafka
(multiple topics) and does aggregation on the data via updateStateByKey with
50 Spark workers where each has 1 core and 6G RAM.  It is working fine for
the first 10mins or so, but then it will stuck in the foreachRDD function. 
Below is the log (repeating while stuck), executor status, and the code.


Log:
15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block
broadcast_873_piece0
15/06/06 12:55:20 INFO BlockManager: Removing broadcast 875
15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875_piece0
15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875_piece0 of size 1820
dropped from memory (free 3369792320)
15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block
broadcast_875_piece0
15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875
15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875 of size 2624 dropped
from memory (free 3369794944)
*15/06/06 12:56:13 INFO MemoryStore: ensureFreeSpace(1650825) called with
curMem=24335467, maxMem=3394130411
15/06/06 12:56:13 INFO MemoryStore: Block input-0-1433620573400 stored as
bytes in memory (estimated size 1612.1 KB, free 3.1 GB)
15/06/06 12:56:13 INFO BlockManagerMaster: Updated info of block
input-0-1433620573400
15/06/06 12:56:20 INFO MemoryStore: ensureFreeSpace(1682877) called with
curMem=25986292, maxMem=3394130411
15/06/06 12:56:20 INFO MemoryStore: Block input-0-1433620579800 stored as
bytes in memory (estimated size 1643.4 KB, free 3.1 GB)
15/06/06 12:56:20 INFO BlockManagerMaster: Updated info of block
input-0-1433620579800
15/06/06 12:56:25 INFO MemoryStore: ensureFreeSpace(1642661) called with
curMem=27669169, maxMem=3394130411
15/06/06 12:56:25 INFO MemoryStore: Block input-0-1433620585000 stored as
bytes in memory (estimated size 1604.2 KB, free 3.1 GB)
15/06/06 12:56:25 INFO BlockManagerMaster: Updated info of block
input-0-1433620585000*


Executor:
Tasks
Index   ID      Attempt Status  Locality Level  Executor ID / Host      Launch 
Time
Duration        GC Time Input Size / Records    Shuffle Read Size / Records     
Errors
1       9087    0       SUCCESS PROCESS_LOCAL   40 / 10.10.10.10        
2015/06/06 12:54:26     3 s             7.7
MB (memory) / 11662     337.2 KB / 6588 
0       9086    0       RUNNING PROCESS_LOCAL   6 / 10.10.10.10         
2015/06/06 12:54:26     3.7 min
2 s     7.8 MB (memory) / 11792 327.4 KB / 6422 

Memory: 39.0 MB Used (160.1 GB Total)
Disk: 0.0 B Used
Executor ID     Address RDD Blocks      Memory Used     Disk Used       Active 
Tasks    Failed
Tasks   Complete Tasks  Total Tasks     Task Time       Input ▴ Shuffle Read    
Shuffle
Write   Logs    Thread Dump
6       10.10.10.10:49298       2       15.6 MB / 3.2 GB        0.0 B   1       
0       412     413     7.0 m   1291.3 MB       8.3
MB      1437.1 KB       
40      10.10.10.10:37480       3       23.4 MB / 3.2 GB        0.0 B   0       
0       413     413     7.1 m   1288.6 MB
10.8 MB 4.0 MB  


Sample code:
val stateDStream =
analyticsDStream.updateStateByKey[StreamBase](updateAnalyticsIterFunc, new
HashPartitioner(ssc.sparkContext.defaultParallelism), true)
stateDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        partition.foreach(record => {
           //Do nothing, and it is still stuck
        })
      })
    })

There is no error messages, and the memory usage seems fine though.  Is
there any reason it will be stuck?  How can I resolve this issue?

Thank you for your help,
Eason




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to