Hi, I have a Spark Streaming application that reads messages from Kafka (multiple topics) and does aggregation on the data via updateStateByKey with 50 Spark workers where each has 1 core and 6G RAM. It is working fine for the first 10mins or so, but then it will stuck in the foreachRDD function. Below is the log (repeating while stuck), executor status, and the code.
Log: 15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block broadcast_873_piece0 15/06/06 12:55:20 INFO BlockManager: Removing broadcast 875 15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875_piece0 15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875_piece0 of size 1820 dropped from memory (free 3369792320) 15/06/06 12:55:20 INFO BlockManagerMaster: Updated info of block broadcast_875_piece0 15/06/06 12:55:20 INFO BlockManager: Removing block broadcast_875 15/06/06 12:55:20 INFO MemoryStore: Block broadcast_875 of size 2624 dropped from memory (free 3369794944) *15/06/06 12:56:13 INFO MemoryStore: ensureFreeSpace(1650825) called with curMem=24335467, maxMem=3394130411 15/06/06 12:56:13 INFO MemoryStore: Block input-0-1433620573400 stored as bytes in memory (estimated size 1612.1 KB, free 3.1 GB) 15/06/06 12:56:13 INFO BlockManagerMaster: Updated info of block input-0-1433620573400 15/06/06 12:56:20 INFO MemoryStore: ensureFreeSpace(1682877) called with curMem=25986292, maxMem=3394130411 15/06/06 12:56:20 INFO MemoryStore: Block input-0-1433620579800 stored as bytes in memory (estimated size 1643.4 KB, free 3.1 GB) 15/06/06 12:56:20 INFO BlockManagerMaster: Updated info of block input-0-1433620579800 15/06/06 12:56:25 INFO MemoryStore: ensureFreeSpace(1642661) called with curMem=27669169, maxMem=3394130411 15/06/06 12:56:25 INFO MemoryStore: Block input-0-1433620585000 stored as bytes in memory (estimated size 1604.2 KB, free 3.1 GB) 15/06/06 12:56:25 INFO BlockManagerMaster: Updated info of block input-0-1433620585000* Executor: Tasks Index ID Attempt Status Locality Level Executor ID / Host Launch Time Duration GC Time Input Size / Records Shuffle Read Size / Records Errors 1 9087 0 SUCCESS PROCESS_LOCAL 40 / 10.10.10.10 2015/06/06 12:54:26 3 s 7.7 MB (memory) / 11662 337.2 KB / 6588 0 9086 0 RUNNING PROCESS_LOCAL 6 / 10.10.10.10 2015/06/06 12:54:26 3.7 min 2 s 7.8 MB (memory) / 11792 327.4 KB / 6422 Memory: 39.0 MB Used (160.1 GB Total) Disk: 0.0 B Used Executor ID Address RDD Blocks Memory Used Disk Used Active Tasks Failed Tasks Complete Tasks Total Tasks Task Time Input ▴ Shuffle Read Shuffle Write Logs Thread Dump 6 10.10.10.10:49298 2 15.6 MB / 3.2 GB 0.0 B 1 0 412 413 7.0 m 1291.3 MB 8.3 MB 1437.1 KB 40 10.10.10.10:37480 3 23.4 MB / 3.2 GB 0.0 B 0 0 413 413 7.1 m 1288.6 MB 10.8 MB 4.0 MB Sample code: val stateDStream = analyticsDStream.updateStateByKey[StreamBase](updateAnalyticsIterFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) stateDStream.foreachRDD(rdd => { rdd.foreachPartition(partition => { partition.foreach(record => { //Do nothing, and it is still stuck }) }) }) There is no error messages, and the memory usage seems fine though. Is there any reason it will be stuck? How can I resolve this issue? Thank you for your help, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Stuck-After-10mins-Issue-tp23189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org