There are two kinds of checkpointing going on here - metadata and data. The
100 second that you have configured is the data checkpointing (expensive,
large data) where the RDD data is being written to HDFS. The 10 second one
is the metadata checkpoint (cheap, small data) where the metadata of the
query (kafka offsets, etc.) are being saved before and after every 10
second batch. Hope this clarifies.

On Thu, Jun 1, 2017 at 2:54 PM, David Rosenstrauch <daro...@gmail.com>
wrote:

> I'm running into a weird issue with a stateful streaming job I'm running.
> (Spark 2.1.0 reading from kafka 0-10 input stream.)
>
> From what I understand from the docs, by default the checkpoint interval
> for stateful streaming is 10 * batchInterval.  Since I'm running a batch
> interval of 10 seconds, I would expect that checkpointing should only get
> done every 100 seconds.  But what I'm seeing is that Spark is not only
> checkpointing every 10 seconds, it's checkpointing twice every 10 seconds!
>
> My code approximately looks like follows:
>
>     val eventStream = kafkaStream.
>         transform(
> ...
>         ).
>         map(
> ...
>         ).
>         transform(
> ...
>         )
>
>     val stateStream = eventStream.mapWithState(
> ...
>     )
>
>     stateUpdatesStream.foreachRDD(
> ...
>     )
>
>
> When the app initializes, the checkpointing configuration looks like so:
>
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Duration for remembering
> RDDs set to 200000 ms for org.apache.spark.streaming.kafka010.
> DirectKafkaInputDStream@4a85a52c
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Storage level = Serialized
> 1x Replicated
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Remember interval = 200000
> ms
> 17/06/01 21:19:05 INFO DirectKafkaInputDStream: Initialized and validated
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4a85a52c
> 17/06/01 21:19:05 INFO TransformedDStream: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 200000 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.TransformedDStream@201d4bfb
> 17/06/01 21:19:05 INFO MappedDStream: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO MappedDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO MappedDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO MappedDStream: Remember interval = 200000 ms
> 17/06/01 21:19:05 INFO MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@1208bde7
> 17/06/01 21:19:05 INFO TransformedDStream: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 200000 ms
> 17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.TransformedDStream@370b0505
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Storage level = Memory
> Deserialized 1x Replicated
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
> 100000 ms
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Remember interval =
> 200000 ms
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Initialized and
> validated org.apache.spark.streaming.dstream.InternalMapWithStateDStream@
> 746c7658
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Storage level = Serialized
> 1x Replicated
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Checkpoint interval = null
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Remember interval = 10000
> ms
> 17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Initialized and validated
> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@75d7326b
> 17/06/01 21:19:05 INFO ForEachDStream: Slide time = 10000 ms
> 17/06/01 21:19:05 INFO ForEachDStream: Storage level = Serialized 1x
> Replicated
> 17/06/01 21:19:05 INFO ForEachDStream: Checkpoint interval = null
> 17/06/01 21:19:05 INFO ForEachDStream: Remember interval = 10000 ms
> 17/06/01 21:19:05 INFO ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@2b3b2628
>
>
> Note that there's one line that's correctly showing the 100 second
> checkpointing interval:
>
> 17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
> 100000 ms
>
>
> And yet the app is still performing checkpointing every 10 seconds ...
> twice every 10 seconds, in fact!
>
> 17/06/01 21:19:10 INFO CheckpointWriter: Submitted checkpoint of time
> 1496351950000 ms to writer queue
> 17/06/01 21:19:10 INFO CheckpointWriter: Saving checkpoint for time
> 1496351950000 ms to file 'hdfs://hadoopmaster01:8020/
> user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351950000'
> 17/06/01 21:19:10 INFO CheckpointWriter: Checkpoint for time 1496351950000
> ms saved to file 'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/
> profilebuild/checkpoint-1496351950000', took 8324 bytes and 165 ms
> 17/06/01 21:19:11 INFO CheckpointWriter: Submitted checkpoint of time
> 1496351950000 ms to writer queue
> 17/06/01 21:19:11 INFO CheckpointWriter: Saving checkpoint for time
> 1496351950000 ms to file 'hdfs://hadoopmaster01:8020/
> user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351950000'
> 17/06/01 21:19:11 INFO CheckpointWriter: Checkpoint for time 1496351950000
> ms saved to file 'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/
> profilebuild/checkpoint-1496351950000', took 8321 bytes and 22 ms
> 17/06/01 21:19:20 INFO CheckpointWriter: Submitted checkpoint of time
> 1496351960000 ms to writer queue
> 17/06/01 21:19:20 INFO CheckpointWriter: Saving checkpoint for time
> 1496351960000 ms to file 'hdfs://hadoopmaster01:8020/
> user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351960000'
> 17/06/01 21:19:20 INFO CheckpointWriter: Checkpoint for time 1496351960000
> ms saved to file 'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/
> profilebuild/checkpoint-1496351960000', took 8390 bytes and 20 ms
> 17/06/01 21:19:20 INFO CheckpointWriter: Submitted checkpoint of time
> 1496351960000 ms to writer queue
> 17/06/01 21:19:20 INFO CheckpointWriter: Saving checkpoint for time
> 1496351960000 ms to file 'hdfs://hadoopmaster01:8020/
> user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351960000'
> 17/06/01 21:19:20 INFO CheckpointWriter: Checkpoint for time 1496351960000
> ms saved to file 'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/
> profilebuild/checkpoint-1496351960000', took 8386 bytes and 24 ms
> 17/06/01 21:19:30 INFO CheckpointWriter: Submitted checkpoint of time
> 1496351970000 ms to writer queue
> 17/06/01 21:19:30 INFO CheckpointWriter: Saving checkpoint for time
> 1496351970000 ms to file 'hdfs://hadoopmaster01:8020/
> user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351970000'
> 17/06/01 21:19:30 INFO CheckpointWriter: Checkpoint for time 1496351970000
> ms saved to file 'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/
> profilebuild/checkpoint-1496351970000', took 8398 bytes and 25 ms
> 17/06/01 21:19:30 INFO CheckpointWriter: Submitted checkpoint of time
> 1496351970000 ms to writer queue
> 17/06/01 21:19:30 INFO CheckpointWriter: Saving checkpoint for time
> 1496351970000 ms to file 'hdfs://hadoopmaster01:8020/
> user/dmx/checkpoint/dev/profilebuild/checkpoint-1496351970000'
> 17/06/01 21:19:30 INFO CheckpointWriter: Checkpoint for time 1496351970000
> ms saved to file 'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/
> profilebuild/checkpoint-1496351970000', took 8394 bytes and 22 ms
>
>
> Anyone have an idea what's going wrong here, and/or how to fix this issue?
>
> Thanks,
>
> DR
>
>
>

Reply via email to