I have a spark streaming dataset that is a union of 12 datasets (for 12 different s3 buckets). On start up , it takes nearly 18/20 mins for the Spark Streaming Job to show up on the Spark Streaming UI and an additional 18-20 mins for the job to even start. When looking at the logs I see something making to the following:
25/04/04 01:45:31 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 36 file(s) in 53349 ms (53.3 seconds) 25/04/04 01:46:58 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 141076 file(s) in 86275 ms (86.3 seconds) 25/04/04 01:49:19 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 6 file(s) in 118100 ms (1.97 minutes) 25/04/04 01:50:32 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 4 file(s) in 73611 ms (1.23 minutes) 25/04/04 01:50:53 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 7 file(s) in 20323 ms (20.3 seconds) 25/04/04 01:51:03 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 41 file(s) in 10268 ms (10.3 seconds) 25/04/04 01:52:18 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 36 file(s) in 74333 ms (1.24 minutes) 25/04/04 01:54:37 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 3 file(s) in 139266 ms (2.32 minutes) 25/04/04 01:57:03 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 7 file(s) in 146221 ms (2.44 minutes) 25/04/04 01:59:52 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 37 file(s) in 168371 ms (2.81 minutes) 25/04/04 02:00:56 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 6 file(s) in 63648 ms (63.6 seconds) 25/04/04 02:03:27 WARN FileStreamSource: [queryId = 93a16] [batchId = 10] Listed 21 file(s) in 151401 ms (2.52 minutes) That’s a total Listing time of 1,105,166 ms (18.4 minutes). Which makes me suspect that each dataset is being filed sequentially. And I believe the culprit is MicroBatchExecution’s constructNextBatch method. Each source is iterated sequentially to get the next & reset offsets, meaning one (or multiple sources in my case) will slow down the entire processing time. Is it feasible to do have each source do this in parallel? Based on a quick spike through It seems that ExecutionContext’s updateStatusMessage method will need to br synchronized if we want it to be atomic but with parallelism there will be no telling what the end state will be since it won’t be deterministic. ExecutionContext’s reportTimeTaken method will definitely need to be synchronized to ensure that each parallel thread writes one at a time. Is there anything else I’m missing? Regards, Jevon Cowell