So If WAL is disabled, how developer can commit offset explicitly in spark
streaming app since we don't write code which will be executed in receiver
?

Plus since offset commitment is asynchronoous, is it possible -it may
happen last offset is not commited yet and next stream batch started on
receiver and it may get duplicate data ?

On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai <[email protected]> wrote:

>  If you disable WAL, Spark Streaming itself will not manage any offset
> related things, is auto commit is enabled by true, Kafka itself will update
> offsets in a time-based way, if auto commit is disabled, no any part will
> call commitOffset, you need to call this API yourself.
>
>
>
> Also Kafka’s offset commitment mechanism is actually a timer way, so it is
> asynchronized with replication.
>
>
>
> *From:* Shushant Arora [mailto:[email protected]]
> *Sent:* Monday, July 6, 2015 8:30 PM
> *To:* Shao, Saisai
> *Cc:* user
> *Subject:* Re: kafka offset commit in spark streaming 1.2
>
>
>
> And what if I disable WAL and use replication of receiver data using 
> StorageLevel.MEMORY_ONLY2().
> Will it commit offset after replicating the message or will it use 
> autocommit.enable
> value. And if it uses this value what if autocommit.enable is set to
> false then when does receiver calls commitOffset?
>
>
>
> On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai <[email protected]>
> wrote:
>
>  If you’re using WAL with Kafka, Spark Streaming will ignore this
> configuration(autocommit.enable) and explicitly call commitOffset to
> update offset to Kafka AFTER WAL is done.
>
>
>
> No matter what you’re setting with autocommit.enable, internally Spark
> Streaming will set it to false to turn off autocommit mechanism.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Shushant Arora [mailto:[email protected]]
> *Sent:* Monday, July 6, 2015 8:11 PM
> *To:* user
> *Subject:* kafka offset commit in spark streaming 1.2
>
>
>
> In spark streaming 1.2 , Is offset of kafka message consumed are updated
> in zookeeper only after writing in WAL if WAL and checkpointig are enabled
> or is it depends upon kafkaparams while initialing the kafkaDstream.
>
>
>
>
>
> Map<String,String> kafkaParams = new HashMap<String, String>();
>
>             kafkaParams.put("zookeeper.connect","ip:2181");
>
>             kafkaParams.put("group.id", "testgroup");
>
>             kafkaParams.put("zookeeper.session.timeout.ms", "10000");
>
>             kafkaParams.put("autocommit.enable","true");
>
>             kafkaParams.put("zookeeper.sync.time.ms", "250");
>
>
>
>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
> byte[].class,kafka.serializer.DefaultDecoder.class ,
> kafka.serializer.DefaultDecoder.class,
>
>                                                 kafkaParams, topicsMap,
> StorageLevel.MEMORY_ONLY()));
>
>
>
>
>
> Here since I have set autocommit.enable to true , does spark streaming
> will ignore this and always call explicit commitOffset high level  consumer
> connector or does it depends on parameter passed?
>
>
>
> Since if it depends upon parameter and receiver calls explicit commit only
> when autocommit is false, then I should override the default autocommit to
> false from true while enabling WAL, since it may give duplicate in case of
> failure if WAL is enabled and autocommit is true.
>
>
>

Reply via email to