I have a situation where I have multiple tests that use Spark streaming with
Manual clock. The first run is OK and processes the data when I increment
the clock to the sliding window duration. The second test deviates and
doesn't process any data. The traces follows which indicates memory store is
called right after the receiver has finished loading the data for that set.
The second test only called the memory store after the batch has started
processing after the manual clock is incremented. 

The following is a trace that works.

15/06/02 10:39:18 INFO ManualLoadFileBasedReceiverDnsData: Took 78913544
nanos to load data
15/06/02 10:39:18 INFO MemoryStore: ensureFreeSpace(1624896) called with
curMem=14071, maxMem=2061647216
15/06/02 10:39:18 INFO MemoryStore: Block input-0-1433266758000 stored as
values in memory (estimated size 1586.8 KB, free 1964.6 MB)
15/06/02 10:39:18 INFO BlockManagerInfo: Added input-0-1433266758000 in
memory on localhost:54349 (size: 1586.8 KB, free: 1964.6 MB)
15/06/02 10:39:18 INFO BlockManagerMaster: Updated info of block
input-0-1433266758000
15/06/02 10:39:18 INFO BlockGenerator: Pushed block input-0-1433266758000
15/06/02 10:39:37 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
2000
15/06/02 10:39:37 INFO FlatMapValuedDStream: Time 2000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 2000 ms
15/06/02 10:39:37 INFO JobScheduler: No jobs added for time 2000 ms
15/06/02 10:39:37 INFO JobGenerator: Checkpointing graph for time 2000 ms
15/06/02 10:39:37 INFO DStreamGraph: Updating checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
4000
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
6000
15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 4000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 4000 ms
15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 4000 ms
15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 6000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 6000 ms
15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 6000 ms
15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 4000 ms
15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 2000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000'
15/06/02 10:39:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 2000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000',
took 18291958 bytes and 87 ms
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
8000
15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 6000 ms
15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 6000
ms
15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 4000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000'
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 6000
ms
15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 4000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000',
took 18291960 bytes and 28 ms
15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is
10000
15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 6000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000'
15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 2000
ms
15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 8000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 8000 ms
15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 8000 ms
15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 4000
ms
15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/06/02 10:39:38 INFO StateDStream: Time 0 ms is invalid as zeroTime is 0
ms and slideDuration is 10000 ms and difference is 0 ms
15/06/02 10:39:38 INFO FlatMappedDStream: Slicing from 2000 ms to 10000 ms
(aligned to 2000 ms and 10000 ms)
15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 6000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000',
took 18291960 bytes and 28 ms
15/06/02 10:39:38 INFO BaseListener: Time difference: 29951
15/06/02 10:39:38 INFO BaseListener: Time difference: 29908
15/06/02 10:39:38 INFO StateDStream: Marking RDD 18 for time 10000 ms for
checkpointing
15/06/02 10:39:38 INFO JobScheduler: Added jobs for time 10000 ms
15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 8000 ms
15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 8000
ms
15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 8000
ms
15/06/02 10:39:38 INFO JobScheduler: Starting job streaming job 10000 ms.0
from job set of time 10000 ms
15/06/02 10:39:38 INFO BaseListener: onBatchStarted

The following is a trace that fails.

15/06/02 10:39:50 INFO ManualLoadFileBasedReceiverDnsData: Took 7062177
nanos to load data
15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 2000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 2000 ms
15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 2000 ms
15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 2000 ms
15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 2000
ms
15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 2000
ms
15/06/02 10:40:10 INFO CheckpointWriter: Saving checkpoint for time 2000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000'
15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 4000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 4000 ms
15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 4000 ms
15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 4000 ms
15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 4000
ms
15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 4000
ms
15/06/02 10:40:10 INFO CheckpointWriter: Checkpoint for time 2000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000',
took 18292157 bytes and 33 ms
15/06/02 10:40:10 INFO DStreamGraph: Clearing checkpoint data for time 2000
ms
15/06/02 10:40:10 INFO CheckpointWriter: Saving checkpoint for time 4000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000'
15/06/02 10:40:10 INFO DStreamGraph: Cleared checkpoint data for time 2000
ms
15/06/02 10:40:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 6000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 6000 ms
15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 6000 ms
15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 6000 ms
15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 6000
ms
15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 6000
ms
15/06/02 10:40:10 INFO CheckpointWriter: Checkpoint for time 4000 ms saved
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000',
took 18292159 bytes and 28 ms
15/06/02 10:40:10 INFO CheckpointWriter: Saving checkpoint for time 6000 ms
to file
'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000'
15/06/02 10:40:10 INFO FlatMapValuedDStream: Time 8000 ms is invalid as
zeroTime is 0 ms and slideDuration is 10000 ms and difference is 8000 ms
15/06/02 10:40:10 INFO JobScheduler: No jobs added for time 8000 ms
15/06/02 10:40:10 INFO DStreamGraph: Clearing checkpoint data for time 4000
ms
15/06/02 10:40:10 INFO DStreamGraph: Cleared checkpoint data for time 4000
ms
15/06/02 10:40:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/06/02 10:40:10 INFO StateDStream: Time 0 ms is invalid as zeroTime is 0
ms and slideDuration is 10000 ms and difference is 0 ms
15/06/02 10:40:10 INFO FlatMappedDStream: Slicing from 2000 ms to 10000 ms
(aligned to 2000 ms and 10000 ms)
15/06/02 10:40:10 INFO StateDStream: Marking RDD 18 for time 10000 ms for
checkpointing
15/06/02 10:40:10 INFO JobScheduler: Added jobs for time 10000 ms
15/06/02 10:40:10 INFO JobGenerator: Checkpointing graph for time 8000 ms
15/06/02 10:40:10 INFO DStreamGraph: Updating checkpoint data for time 8000
ms
15/06/02 10:40:10 INFO JobScheduler: Starting job streaming job 10000 ms.0
from job set of time 10000 ms
15/06/02 10:40:10 INFO BaseListener: onBatchStarted
15/06/02 10:40:10 INFO DStreamGraph: Updated checkpoint data for time 8000
ms
15/06/02 10:40:10 INFO SparkContext: Starting job: foreachRDD at
AbstractModelExecutor.java:247
15/06/02 10:40:10 INFO DAGScheduler: Registering RDD 16 (window at
ModelExecutor.java:71)
15/06/02 10:40:10 INFO DAGScheduler: Got job 1 (foreachRDD at
AbstractModelExecutor.java:247) with 2 output partitions (allowLocal=false)
15/06/02 10:40:10 INFO DAGScheduler: Final stage: Stage 2(foreachRDD at
AbstractModelExecutor.java:247)
15/06/02 10:40:10 INFO DAGScheduler: Parents of final stage: List(Stage 1)
15/06/02 10:40:10 INFO DAGScheduler: Missing parents: List()
15/06/02 10:40:10 INFO DAGScheduler: Submitting Stage 2
(MapPartitionsRDD[19] at flatMapValues at AbstractModelExecutor.java:224),
which has no missing parents
15/06/02 10:40:10 INFO MemoryStore: ensureFreeSpace(9992) called with
curMem=14071, maxMem=2061647216
15/06/02 10:40:10 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 9.8 KB, free 1966.1 MB)
15/06/02 10:40:10 INFO MemoryStore: ensureFreeSpace(6352) called with
curMem=24063, maxMem=2061647216
15/06/02 10:40:10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 6.2 KB, free 1966.1 MB)
15/06/02 10:40:10 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on localhost:54352 (size: 6.2 KB, free: 1966.1 MB)
15/06/02 10:40:10 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0

 I have a third streaming test that works so its not all UTs fail. Change
the order of the two tests and the failure remains on the second test. The
logs indicate spark context is terminated before starting up a new UT. The
point of going this route is to be able to use as much of the existing code
paths to deterministically run tests and have verifiable results. 

 So a couple of questions from this.

- Can I determine when my data stored through a receiver has persisted the
data into spark so I can increment the clock to trigger processing?
- I understand I could use a Queue for the data but this is less desirable.
Is the use of a Queue the only way to get the deterministic processing for
each batch?

Thanks,

Mark



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-Spark-Streaming-and-Manual-Clock-used-for-Unit-Tests-tp23114.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to