So If WAL is disabled, how developer can commit offset explicitly in spark streaming app since we don't write code which will be executed in receiver ?
Plus since offset commitment is asynchronoous, is it possible -it may happen last offset is not commited yet and next stream batch started on receiver and it may get duplicate data ? On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai <[email protected]> wrote: > If you disable WAL, Spark Streaming itself will not manage any offset > related things, is auto commit is enabled by true, Kafka itself will update > offsets in a time-based way, if auto commit is disabled, no any part will > call commitOffset, you need to call this API yourself. > > > > Also Kafka’s offset commitment mechanism is actually a timer way, so it is > asynchronized with replication. > > > > *From:* Shushant Arora [mailto:[email protected]] > *Sent:* Monday, July 6, 2015 8:30 PM > *To:* Shao, Saisai > *Cc:* user > *Subject:* Re: kafka offset commit in spark streaming 1.2 > > > > And what if I disable WAL and use replication of receiver data using > StorageLevel.MEMORY_ONLY2(). > Will it commit offset after replicating the message or will it use > autocommit.enable > value. And if it uses this value what if autocommit.enable is set to > false then when does receiver calls commitOffset? > > > > On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai <[email protected]> > wrote: > > If you’re using WAL with Kafka, Spark Streaming will ignore this > configuration(autocommit.enable) and explicitly call commitOffset to > update offset to Kafka AFTER WAL is done. > > > > No matter what you’re setting with autocommit.enable, internally Spark > Streaming will set it to false to turn off autocommit mechanism. > > > > Thanks > > Jerry > > > > *From:* Shushant Arora [mailto:[email protected]] > *Sent:* Monday, July 6, 2015 8:11 PM > *To:* user > *Subject:* kafka offset commit in spark streaming 1.2 > > > > In spark streaming 1.2 , Is offset of kafka message consumed are updated > in zookeeper only after writing in WAL if WAL and checkpointig are enabled > or is it depends upon kafkaparams while initialing the kafkaDstream. > > > > > > Map<String,String> kafkaParams = new HashMap<String, String>(); > > kafkaParams.put("zookeeper.connect","ip:2181"); > > kafkaParams.put("group.id", "testgroup"); > > kafkaParams.put("zookeeper.session.timeout.ms", "10000"); > > kafkaParams.put("autocommit.enable","true"); > > kafkaParams.put("zookeeper.sync.time.ms", "250"); > > > > kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, > byte[].class,kafka.serializer.DefaultDecoder.class , > kafka.serializer.DefaultDecoder.class, > > kafkaParams, topicsMap, > StorageLevel.MEMORY_ONLY())); > > > > > > Here since I have set autocommit.enable to true , does spark streaming > will ignore this and always call explicit commitOffset high level consumer > connector or does it depends on parameter passed? > > > > Since if it depends upon parameter and receiver calls explicit commit only > when autocommit is false, then I should override the default autocommit to > false from true while enabling WAL, since it may give duplicate in case of > failure if WAL is enabled and autocommit is true. > > >
