Binh, thank you very much for your comment and code. Please could you
outline an example use of your stream? I am a newbie to Spark. Thanks again!

On 18 June 2015 at 14:29, Binh Nguyen Van <[email protected]> wrote:

> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
> get the serialized behavior by using default scheduler when there is
> failure and retry
> so I created a customized stream like this.
>
> class EachSeqRDD[T: ClassTag] (
>     parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>   ) extends DStream[Unit](parent.ssc) {
>
>   override def slideDuration: Duration = parent.slideDuration
>
>   override def dependencies: List[DStream[_]] = List(parent)
>
>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>
>   override private[streaming] def generateJob(time: Time): Option[Job] = {
>     val pendingJobs = ssc.scheduler.getPendingTimes().size
>     logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
>     // do not generate new RDD if there is pending job
>     if (pendingJobs == 0) {
>       parent.getOrCompute(time) match {
>         case Some(rdd) => {
>           val jobFunc = () => {
>             ssc.sparkContext.setCallSite(creationSite)
>             eachSeqFunc(rdd, time)
>           }
>           Some(new Job(time, jobFunc))
>         }
>         case None => None
>       }
>     }
>     else {
>       None
>     }
>   }
> }
> object DStreamEx {
>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
>     def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>       // because the DStream is reachable from the outer object here, and 
> because
>       // DStreams can't be serialized with closures, we can't proactively 
> check
>       // it for serializability and so we pass the optional false to 
> SparkContext.clean
>       new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, 
> false)).register()
>     }
>   }
> }
>
> -Binh
> ​
>
> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <[email protected]>
> wrote:
>
>> Tathagata, thanks for your response. You are right! Everything seems
>> to work as expected.
>>
>> Please could help me understand why the time for processing of all
>> jobs for a batch is always less than 4 seconds?
>>
>> Please see my playground code below.
>>
>> The last modified time of the input (lines) RDD dump files seems to
>> match the Thread.sleep delays (20s or 5s) in the transform operation
>> or the batching interval (10s): 20s, 5s, 10s.
>>
>> However, neither the batch processing time in the Streaming tab nor
>> the last modified time of the output (words) RDD dump files reflect
>> the Thread.sleep delays.
>>
>> 07:20       3240  001_lines_...
>>       07:21 117   001_words_...
>> 07:41       37224 002_lines_...
>>       07:43 252   002_words_...
>> 08:00       37728 003_lines_...
>>       08:02 504   003_words_...
>> 08:20       38952 004_lines_...
>>       08:22 756   004_words_...
>> 08:40       38664 005_lines_...
>>       08:42 999   005_words_...
>> 08:45       38160 006_lines_...
>>       08:47 1134  006_words_...
>> 08:50       9720  007_lines_...
>>       08:51 1260  007_words_...
>> 08:55       9864  008_lines_...
>>       08:56 1260  008_words_...
>> 09:00       10656 009_lines_...
>>       09:01 1395  009_words_...
>> 09:05       11664 010_lines_...
>>       09:06 1395  010_words_...
>> 09:11       10935 011_lines_...
>>       09:11 1521  011_words_...
>> 09:16       11745 012_lines_...
>>       09:16 1530  012_words_...
>> 09:21       12069 013_lines_...
>>       09:22 1656  013_words_...
>> 09:27       10692 014_lines_...
>>       09:27 1665  014_words_...
>> 09:32       10449 015_lines_...
>>       09:32 1791  015_words_...
>> 09:37       11178 016_lines_...
>>       09:37 1800  016_words_...
>> 09:45       17496 017_lines_...
>>       09:45 1926  017_words_...
>> 09:55       22032 018_lines_...
>>       09:56 2061  018_words_...
>> 10:05       21951 019_lines_...
>>       10:06 2196  019_words_...
>> 10:15       21870 020_lines_...
>>       10:16 2322  020_words_...
>> 10:25       21303 021_lines_...
>>       10:26 2340  021_words_...
>>
>>
>> final SparkConf conf = new
>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>> try (final JavaStreamingContext context = new
>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>
>>     context.checkpoint("/tmp/checkpoint");
>>
>>     final JavaDStream<String> lines = context.union(
>>         context.receiverStream(new GeneratorReceiver()),
>>         ImmutableList.of(
>>             context.receiverStream(new GeneratorReceiver()),
>>             context.receiverStream(new GeneratorReceiver())));
>>
>>     lines.print();
>>
>>     final Accumulator<Integer> lineRddIndex =
>> context.sparkContext().accumulator(0);
>>     lines.foreachRDD( rdd -> {
>>         lineRddIndex.add(1);
>>         final String prefix = "/tmp/" + String.format("%03d",
>> lineRddIndex.localValue()) + "_lines_";
>>         try (final PrintStream out = new PrintStream(prefix +
>> UUID.randomUUID())) {
>>             rdd.collect().forEach(s -> out.println(s));
>>         }
>>         return null;
>>     });
>>
>>     final JavaDStream<String> words =
>>         lines.flatMap(x -> Arrays.asList(x.split(" ")));
>>     final JavaPairDStream<String, Integer> pairs =
>>         words.mapToPair(s -> new Tuple2<String, Integer>(s, 1));
>>     final JavaPairDStream<String, Integer> wordCounts =
>>         pairs.reduceByKey((i1, i2) -> i1 + i2);
>>
>>     final Accumulator<Integer> sleep =
>> context.sparkContext().accumulator(0);
>>     final JavaPairDStream<String, Integer> wordCounts2 =
>> JavaPairDStream.fromJavaDStream(
>>         wordCounts.transform( (rdd) -> {
>>             sleep.add(1);
>>             Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000);
>>             return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd),
>> rdd.classTag());
>>         }));
>>
>>     final Function2<List<Integer>, Optional<Integer>,
>> Optional<Integer>> updateFunction =
>>         (values, state) -> {
>>             Integer newSum = state.or(0);
>>             for (final Integer value : values) {
>>                 newSum += value;
>>             }
>>             return Optional.of(newSum);
>>         };
>>
>>     final List<Tuple2<String, Integer>> tuples =
>> ImmutableList.<Tuple2<String, Integer>> of();
>>     final JavaPairRDD<String, Integer> initialRDD =
>> context.sparkContext().parallelizePairs(tuples);
>>
>>     final JavaPairDStream<String, Integer> wordCountsState =
>>         wordCounts2.updateStateByKey(
>>              updateFunction,
>>              new
>> HashPartitioner(context.sparkContext().defaultParallelism()),
>> initialRDD);
>>
>>     wordCountsState.print();
>>
>>     final Accumulator<Integer> rddIndex =
>> context.sparkContext().accumulator(0);
>>     wordCountsState.foreachRDD( rdd -> {
>>         rddIndex.add(1);
>>         final String prefix = "/tmp/" + String.format("%03d",
>> rddIndex.localValue()) + "_words_";
>>         try (final PrintStream out = new PrintStream(prefix +
>> UUID.randomUUID())) {
>>             rdd.collect().forEach(s -> out.println(s));
>>         }
>>         return null;
>>     });
>>
>>     context.start();
>>     context.awaitTermination();
>> }
>>
>>
>> On 17 June 2015 at 17:25, Tathagata Das <[email protected]> wrote:
>> > The default behavior should be that batch X + 1 starts processing only
>> after
>> > batch X completes. If you are using Spark 1.4.0, could you show us a
>> > screenshot of the streaming tab, especially the list of batches? And
>> could
>> > you also tell us if you are setting any SparkConf configurations?
>> >
>> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <[email protected]>
>> wrote:
>> >>
>> >> Is it possible to achieve serial batching with Spark Streaming?
>> >>
>> >> Example:
>> >>
>> >> I configure the Streaming Context for creating a batch every 3 seconds.
>> >>
>> >> Processing of the batch #2 takes longer than 3 seconds and creates a
>> >> backlog of batches:
>> >>
>> >> batch #1 takes 2s
>> >> batch #2 takes 10s
>> >> batch #3 takes 2s
>> >> batch #4 takes 2s
>> >>
>> >> Whet testing locally, it seems that processing of multiple batches is
>> >> finished at the same time:
>> >>
>> >> batch #1 finished at 2s
>> >> batch #2 finished at 12s
>> >> batch #3 finished at 12s (processed in parallel)
>> >> batch #4 finished at 15s
>> >>
>> >> How can I delay processing of the next batch, so that is processed
>> >> only after processing of the previous batch has been completed?
>> >>
>> >> batch #1 finished at 2s
>> >> batch #2 finished at 12s
>> >> batch #3 finished at 14s (processed serially)
>> >> batch #4 finished at 16s
>> >>
>> >> I want to perform a transformation for every key only once in a given
>> >> period of time (e.g. batch duration). I find all unique keys in a
>> >> batch and perform the transformation on each key. To ensure that the
>> >> transformation is done for every key only once, only one batch can be
>> >> processed at a time. At the same time, I want that single batch to be
>> >> processed in parallel.
>> >>
>> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
>> >> stream = context.receiverStream(...);
>> >> stream
>> >>     .reduceByKey(...)
>> >>     .transform(...)
>> >>     .foreachRDD(output);
>> >>
>> >> Any ideas or pointers are very welcome.
>> >>
>> >> Thanks!
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: [email protected]
>> >> For additional commands, e-mail: [email protected]
>> >>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>

Reply via email to