Cody, Thanks for the message. 1. as you mentioned, I do find the version for kafka 0.10.1, I will use that, although lots of experimental tags. Thank you. 2. I have done thorough investigating, it is NOT the scenario where 1st process failed, then 2nd process triggered. 3. I do agree the session timeout, auto commit are not the root cause here. 4. the problem i see is liked caused by a filter and union of the dstream (I will try to elaborate in another question post) if i just do kafka-stream -- process -- output operator, then there is no problem if i do kafka-stream -- process(1) - filter a stream A for later union --| |_ filter a stream B -- process(2) -----|_ A union B output process (3) the duplication message start process at the end of process(1), see following traces:
16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1* (fetch EVENT 1st time)* 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator 192.168.2.6:9092 (id: 2147483647 rack: null) for group spark-executor-testgid. log of processing (1) for event 1 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). 1401 bytes result sent to driver 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 36) in 3494 ms on localhost (3/3) 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages 16/09/10 00:11:03 INFO DAGScheduler: running: Set() 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, ResultStage 11) 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which has no missing parents 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, partition 2 offsets 1 -> 2 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 (* (fetch the same EVENT 2nd time)*) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491460000 ms.0 from job set of time 1473491460000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time 1473491460000 ms (execution: 10.874 s) (EVENT 1st time process cost 10.874 s) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491465000 ms.0 from job set of time 1473491465000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time 1473491465000 ms (execution: 0.066 s) (EVENT 2nd time process cost 0.066) and the 2nd time processing of the event finished without really doing the work. 2016-09-08 14:55 GMT-07:00 Cody Koeninger <c...@koeninger.org>: > - If you're seeing repeated attempts to process the same message, you > should be able to look in the UI or logs and see that a task has > failed. Figure out why that task failed before chasing other things > > - You're not using the latest version, the latest version is for spark > 2.0. There are two versions of the connector for spark 2.0, one for > kafka 0.8 or higher, and one for kafka 0.10 or higher > > - Committing individual messages to kafka doesn't make any sense, > spark streaming deals with batches. If you're doing any aggregations > that involve shuffling, there isn't even a guarantee that you'll > process messages in order for a given topicpartition > > - Auto commit has no effect for the 0.8 version of createDirectStream. > Turning it on for the 0.10 version of createDirectStream is a really > bad idea, it will give you undefined delivery semantics, because the > commit to Kafka is unrelated to whether the batch processed > successfully > > If you're unclear on how the kafka integration works, see > > https://github.com/koeninger/kafka-exactly-once > > On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi <phillipchen...@gmail.com> wrote: > > I am using the lastest streaming kafka connector > > <groupId>org.apache.spark</groupId> > > <artifactId>spark-streaming-kafka_2.11</artifactId> > > <version>1.6.2</version> > > > > I am facing the problem that a message is delivered two times to my > > consumers. these two deliveries are 10+ seconds apart, it looks this is > > caused by my lengthy message processing (took about 60 seconds), then I > > tried to solve this, but I am still stuck. > > > > 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9 > > but not v.10 > > > > JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream( > jsc, > > String.class, String.class, > StringDecoder.class, StringDecoder.class, > > kafkaParams, topicsSet); > > > > 2. after i got the message from the kafka streaming via consumer, how > can I > > commit the message without finish the whole processing (the whole > processing > > might take minutes), it looks I can't get the consumer from the > KafkaUtils > > to execute the kafka commit API. > > > > 3. If I can't do the manual commit, then I need to tell Kafka Consumer to > > allow longer session or auto commit, for v0.8 or v0.9, I have tried to > pass > > following properties to KafkaUtils > > > > kafkaParams.put("auto.commit.enable", "true"); > > kafkaParams.put("auto.commit.interval.ms", "1000"); > > kafkaParams.put("zookeeper.session.timeout.ms", "60000"); > > kafkaParams.put("zookeeper.connection.timeout.ms", "60000"); > > > > Still not working. > > Help is greatly appreciated ! > > > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/spark-streaming-kafka-connector- > questions-tp27681.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >