It turns out that call() function runs in different stages ... 2016-09-07 20:37:21,086 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 11.0 (TID 11) 2016-09-07 20:37:21,087 [Executor task launch worker-0] DEBUG org.apache.spark.executor.Executor - Task 11's epoch is 0 ... 2016-09-07 20:37:21,096 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 11.0 (TID 11). 2412 bytes result sent to driver ... <=== call() called here !! .... 2016-09-07 20:37:22,341 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 12.0 (TID 12) 2016-09-07 20:37:22,343 [Executor task launch worker-0] DEBUG org.apache.spark.executor.Executor - Task 12's epoch is 0 .... <=== call() called here !! .... 2016-09-07 20:37:22,362 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 12.0 (TID 12). 2518 bytes result sent to driver ....
Does anyone have any ideas? On Wed, Sep 7, 2016 at 7:30 PM, Kevin Tran <kevin...@gmail.com> wrote: > Hi Everyone, > Does anyone know why call() function being called *3 times* for each > message arrive > > JavaDStream<String> message = messagesDStream.map(new >>> Function<Tuple2<String, String>, String>() { >> >> @Override >> >> public String call(Tuple2<String, String> tuple2) { >> >> return tuple2._2(); >> >> } >> >> }); >> >> >>> >> >> message.foreachRDD(rdd -> { >> >> logger.debug("---> New RDD with " + rdd.partitions().size() + " >>> partitions and " + rdd.count() + " records"); *<== 1* >> >> SQLContext sqlContext = new SQLContext(rdd.context()); >> >> >>> JavaRDD<JavaBean> rowRDD = rdd.map(new Function<String, JavaBean>() { >> >> public JavaBean call(String record) { >>> *<== being called 3 times* >> >> > > What I tried: > * *cache()* > * cleaning up *checkpoint dir* > > Thanks, > Kevin. > > >