I have a situation where I have multiple tests that use Spark streaming with Manual clock. The first run is OK and processes the data when I increment the clock to the sliding window duration. The second test deviates and doesn't process any data. The traces follows which indicates memory store is called right after the receiver has finished loading the data for that set. The second test only called the memory store after the batch has started processing after the manual clock is incremented.
The following is a trace that works. 15/06/02 10:39:18 INFO ManualLoadFileBasedReceiverDnsData: Took 78913544 nanos to load data 15/06/02 10:39:18 INFO MemoryStore: ensureFreeSpace(1624896) called with curMem=14071, maxMem=2061647216 15/06/02 10:39:18 INFO MemoryStore: Block input-0-1433266758000 stored as values in memory (estimated size 1586.8 KB, free 1964.6 MB) 15/06/02 10:39:18 INFO BlockManagerInfo: Added input-0-1433266758000 in memory on localhost:54349 (size: 1586.8 KB, free: 1964.6 MB) 15/06/02 10:39:18 INFO BlockManagerMaster: Updated info of block input-0-1433266758000 15/06/02 10:39:18 INFO BlockGenerator: Pushed block input-0-1433266758000 15/06/02 10:39:37 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 2000 15/06/02 10:39:37 INFO FlatMapValuedDStream: Time 2000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 2000 ms 15/06/02 10:39:37 INFO JobScheduler: No jobs added for time 2000 ms 15/06/02 10:39:37 INFO JobGenerator: Checkpointing graph for time 2000 ms 15/06/02 10:39:37 INFO DStreamGraph: Updating checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 4000 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 6000 15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 4000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 4000 ms 15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 4000 ms 15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 6000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 6000 ms 15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 6000 ms 15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 4000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 2000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000' 15/06/02 10:39:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 2000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000', took 18291958 bytes and 87 ms 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 8000 15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 6000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 6000 ms 15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 4000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000' 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 6000 ms 15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 4000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000', took 18291960 bytes and 28 ms 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 10000 15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 6000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000' 15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 8000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 8000 ms 15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 8000 ms 15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/06/02 10:39:38 INFO StateDStream: Time 0 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 0 ms 15/06/02 10:39:38 INFO FlatMappedDStream: Slicing from 2000 ms to 10000 ms (aligned to 2000 ms and 10000 ms) 15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 6000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000', took 18291960 bytes and 28 ms 15/06/02 10:39:38 INFO BaseListener: Time difference: 29951 15/06/02 10:39:38 INFO BaseListener: Time difference: 29908 15/06/02 10:39:38 INFO StateDStream: Marking RDD 18 for time 10000 ms for checkpointing 15/06/02 10:39:38 INFO JobScheduler: Added jobs for time 10000 ms 15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 8000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 8000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 8000 ms 15/06/02 10:39:38 INFO JobScheduler: Starting job streaming job 10000 ms.0 from job set of time 10000 ms 15/06/02 10:39:38 INFO BaseListener: onBatchStarted The following is a trace that fails. 15/06/02 10:39:50 INFO ManualLoadFileBasedReceiverDnsData: Took 7062177 nanos to load data 15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 2000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 2000 ms 15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 2000 ms 15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 2000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 2000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 2000 ms 15/06/02 10:40:10 INFO CheckpointWriter: Saving checkpoint for time 2000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000' 15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 4000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 4000 ms 15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 4000 ms 15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 4000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 4000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 4000 ms 15/06/02 10:40:10 INFO CheckpointWriter: Checkpoint for time 2000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000', took 18292157 bytes and 33 ms 15/06/02 10:40:10 INFO DStreamGraph: Clearing checkpoint data for time 2000 ms 15/06/02 10:40:10 INFO CheckpointWriter: Saving checkpoint for time 4000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000' 15/06/02 10:40:10 INFO DStreamGraph: Cleared checkpoint data for time 2000 ms 15/06/02 10:40:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 6000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 6000 ms 15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 6000 ms 15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 6000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 6000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 6000 ms 15/06/02 10:40:10 INFO CheckpointWriter: Checkpoint for time 4000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000', took 18292159 bytes and 28 ms 15/06/02 10:40:10 INFO CheckpointWriter: Saving checkpoint for time 6000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000' 15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 8000 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 8000 ms 15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 8000 ms 15/06/02 10:40:10 INFO DStreamGraph: Clearing checkpoint data for time 4000 ms 15/06/02 10:40:10 INFO DStreamGraph: Cleared checkpoint data for time 4000 ms 15/06/02 10:40:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/06/02 10:40:10 INFO StateDStream: Time 0 ms is invalid as zeroTime is 0 ms and slideDuration is 10000 ms and difference is 0 ms 15/06/02 10:40:10 INFO FlatMappedDStream: Slicing from 2000 ms to 10000 ms (aligned to 2000 ms and 10000 ms) 15/06/02 10:40:10 INFO StateDStream: Marking RDD 18 for time 10000 ms for checkpointing 15/06/02 10:40:10 INFO JobScheduler: Added jobs for time 10000 ms 15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 8000 ms 15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 8000 ms 15/06/02 10:40:10 INFO JobScheduler: Starting job streaming job 10000 ms.0 from job set of time 10000 ms 15/06/02 10:40:10 INFO BaseListener: onBatchStarted 15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 8000 ms 15/06/02 10:40:10 INFO SparkContext: Starting job: foreachRDD at AbstractModelExecutor.java:247 15/06/02 10:40:10 INFO DAGScheduler: Registering RDD 16 (window at ModelExecutor.java:71) 15/06/02 10:40:10 INFO DAGScheduler: Got job 1 (foreachRDD at AbstractModelExecutor.java:247) with 2 output partitions (allowLocal=false) 15/06/02 10:40:10 INFO DAGScheduler: Final stage: Stage 2(foreachRDD at AbstractModelExecutor.java:247) 15/06/02 10:40:10 INFO DAGScheduler: Parents of final stage: List(Stage 1) 15/06/02 10:40:10 INFO DAGScheduler: Missing parents: List() 15/06/02 10:40:10 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[19] at flatMapValues at AbstractModelExecutor.java:224), which has no missing parents 15/06/02 10:40:10 INFO MemoryStore: ensureFreeSpace(9992) called with curMem=14071, maxMem=2061647216 15/06/02 10:40:10 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 9.8 KB, free 1966.1 MB) 15/06/02 10:40:10 INFO MemoryStore: ensureFreeSpace(6352) called with curMem=24063, maxMem=2061647216 15/06/02 10:40:10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 6.2 KB, free 1966.1 MB) 15/06/02 10:40:10 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:54352 (size: 6.2 KB, free: 1966.1 MB) 15/06/02 10:40:10 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 I have a third streaming test that works so its not all UTs fail. Change the order of the two tests and the failure remains on the second test. The logs indicate spark context is terminated before starting up a new UT. The point of going this route is to be able to use as much of the existing code paths to deterministically run tests and have verifiable results. So a couple of questions from this. - Can I determine when my data stored through a receiver has persisted the data into spark so I can increment the clock to trigger processing? - I understand I could use a Queue for the data but this is less desirable. Is the use of a Queue the only way to get the deterministic processing for each batch? Thanks, Mark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-Spark-Streaming-and-Manual-Clock-used-for-Unit-Tests-tp23114.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org