Cody, Thanks for the message.

1. as you mentioned, I do find the version for kafka 0.10.1, I will use
that, although lots of experimental tags. Thank you.
2. I have done thorough investigating, it is NOT the scenario where 1st
process failed, then 2nd process triggered.
3. I do agree the session timeout, auto commit are not the root cause here.
4. the problem i see is liked caused by a filter and union of the dstream
(I will try to elaborate in another question post)
if i just do kafka-stream -- process -- output operator, then there is no
problem
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
                                       |_ filter a stream B  -- process(2)
-----|_ A union B output process (3)
the duplication message start process at the end of process(1), see
following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1* (fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing
(1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 (* (fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491460000 ms.0 from job set of time 1473491460000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
1473491460000 ms (execution: 10.874 s) (EVENT 1st time process cost 10.874
s)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) (EVENT 2nd time process cost 0.066)

and the 2nd time processing of the event finished without really doing the
work.

2016-09-08 14:55 GMT-07:00 Cody Koeninger <c...@koeninger.org>:

> - If you're seeing repeated attempts to process the same message, you
> should be able to look in the UI or logs and see that a task has
> failed.  Figure out why that task failed before chasing other things
>
> - You're not using the latest version, the latest version is for spark
> 2.0.  There are two versions of the connector for spark 2.0, one for
> kafka 0.8 or higher, and one for kafka 0.10 or higher
>
> - Committing individual messages to kafka doesn't make any sense,
> spark streaming deals with batches.  If you're doing any aggregations
> that involve shuffling, there isn't even a guarantee that you'll
> process messages in order for a given topicpartition
>
> - Auto commit has no effect for the 0.8 version of createDirectStream.
> Turning it on for the 0.10 version of createDirectStream is a really
> bad idea, it will give you undefined delivery semantics, because the
> commit to Kafka is unrelated to whether the batch processed
> successfully
>
> If you're unclear on how the kafka integration works, see
>
> https://github.com/koeninger/kafka-exactly-once
>
> On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi <phillipchen...@gmail.com> wrote:
> > I am using the lastest streaming kafka connector
> > <groupId>org.apache.spark</groupId>
> > <artifactId>spark-streaming-kafka_2.11</artifactId>
> > <version>1.6.2</version>
> >
> > I am facing the problem that a message is delivered two times to my
> > consumers. these two deliveries are 10+ seconds apart, it looks this is
> > caused by my lengthy message processing (took about 60 seconds), then I
> > tried to solve this, but I am still stuck.
> >
> > 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
> > but not v.10
> >
> > JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream(
> jsc,
> >                                         String.class, String.class,
> StringDecoder.class, StringDecoder.class,
> > kafkaParams, topicsSet);
> >
> > 2. after i got the message from the kafka streaming via consumer, how
> can I
> > commit the message without finish the whole processing (the whole
> processing
> > might take minutes), it looks I can't get the consumer from the
> KafkaUtils
> > to execute the kafka commit API.
> >
> > 3. If I can't do the manual commit, then I need to tell Kafka Consumer to
> > allow longer session or auto commit, for v0.8 or v0.9, I have tried to
> pass
> > following properties to KafkaUtils
> >
> > kafkaParams.put("auto.commit.enable", "true");
> > kafkaParams.put("auto.commit.interval.ms", "1000");
> > kafkaParams.put("zookeeper.session.timeout.ms", "60000");
> > kafkaParams.put("zookeeper.connection.timeout.ms", "60000");
> >
> > Still not working.
> > Help is greatly appreciated !
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-
> questions-tp27681.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>

Reply via email to