I'll check the log info message..
Meanwhile, the code is basically
public class KafkaSparkStreamingDriver implements Serializable {
......
SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);
jssc.start();
jssc.awaitTermination();
jssc.close();
......
private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{
@Override
public JavaStreamingContext create() {
return createContext(sparkConf, params);
}
};
return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);
}
.......
private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {
// Create context with the specified batch interval, in milliseconds.
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
// Set the checkpoint directory, if we're checkpointing
if (params.isCheckpointed()) {
jssc.checkpoint(params.getCheckpointDir());
}
Set<String> topicsSet = new HashSet<String>(Arrays.asList(params
.getTopic()));
// Set the Kafka parameters.
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());
if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());
}
// Create direct Kafka stream with the brokers and the topic.
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
// See if there's an override of the default checkpoint duration.
if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));
}
.............
On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <[email protected]> wrote:
> If you've set the checkpoint dir, it seems like indeed the intent is
> to use a default checkpoint interval in DStream:
>
> private[streaming] def initialize(time: Time) {
> ...
> // Set the checkpoint interval to be slideDuration or 10 seconds,
> which ever is larger
> if (mustCheckpoint && checkpointDuration == null) {
> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
> slideDuration).toInt
> logInfo("Checkpoint interval automatically set to " +
> checkpointDuration)
> }
>
> Do you see that log message? what's the interval? that could at least
> explain why it's not doing anything, if it's quite long.
>
> It sort of seems wrong though since
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> suggests it was intended to be a multiple of the batch interval. The
> slide duration wouldn't always be relevant anyway.
>
> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
> <[email protected]> wrote:
> > I've instrumented checkpointing per the programming guide and I can tell
> > that Spark Streaming is creating the checkpoint directories but I'm not
> > seeing any content being created in those directories nor am I seeing the
> > effects I'd expect from checkpointing. I'd expect any data that comes
> into
> > Kafka while the consumers are down, to get picked up when the consumers
> are
> > restarted; I'm not seeing that.
> >
> > For now my checkpoint directory is set to the local file system with the
> > directory URI being in this form: file:///mnt/dir1/dir2. I see a
> > subdirectory named with a UUID being created under there but no files.
> >
> > I'm using a custom JavaStreamingContextFactory which creates a
> > JavaStreamingContext with the directory set into it via the
> > checkpoint(String) method.
> >
> > I'm currently not invoking the checkpoint(Duration) method on the DStream
> > since I want to first rely on Spark's default checkpointing interval. My
> > streaming batch duration millis is set to 1 second.
> >
> > Anyone have any idea what might be going wrong?
> >
> > Also, at which point does Spark delete files from checkpointing?
> >
> > Thanks.
>