Binh, thank you very much for your comment and code. Please could you outline an example use of your stream? I am a newbie to Spark. Thanks again!
On 18 June 2015 at 14:29, Binh Nguyen Van <[email protected]> wrote: > I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not > get the serialized behavior by using default scheduler when there is > failure and retry > so I created a customized stream like this. > > class EachSeqRDD[T: ClassTag] ( > parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit > ) extends DStream[Unit](parent.ssc) { > > override def slideDuration: Duration = parent.slideDuration > > override def dependencies: List[DStream[_]] = List(parent) > > override def compute(validTime: Time): Option[RDD[Unit]] = None > > override private[streaming] def generateJob(time: Time): Option[Job] = { > val pendingJobs = ssc.scheduler.getPendingTimes().size > logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time)) > // do not generate new RDD if there is pending job > if (pendingJobs == 0) { > parent.getOrCompute(time) match { > case Some(rdd) => { > val jobFunc = () => { > ssc.sparkContext.setCallSite(creationSite) > eachSeqFunc(rdd, time) > } > Some(new Job(time, jobFunc)) > } > case None => None > } > } > else { > None > } > } > } > object DStreamEx { > implicit class EDStream[T: ClassTag](dStream: DStream[T]) { > def eachSeqRDD(func: (RDD[T], Time) => Unit) = { > // because the DStream is reachable from the outer object here, and > because > // DStreams can't be serialized with closures, we can't proactively > check > // it for serializability and so we pass the optional false to > SparkContext.clean > new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, > false)).register() > } > } > } > > -Binh > > > On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <[email protected]> > wrote: > >> Tathagata, thanks for your response. You are right! Everything seems >> to work as expected. >> >> Please could help me understand why the time for processing of all >> jobs for a batch is always less than 4 seconds? >> >> Please see my playground code below. >> >> The last modified time of the input (lines) RDD dump files seems to >> match the Thread.sleep delays (20s or 5s) in the transform operation >> or the batching interval (10s): 20s, 5s, 10s. >> >> However, neither the batch processing time in the Streaming tab nor >> the last modified time of the output (words) RDD dump files reflect >> the Thread.sleep delays. >> >> 07:20 3240 001_lines_... >> 07:21 117 001_words_... >> 07:41 37224 002_lines_... >> 07:43 252 002_words_... >> 08:00 37728 003_lines_... >> 08:02 504 003_words_... >> 08:20 38952 004_lines_... >> 08:22 756 004_words_... >> 08:40 38664 005_lines_... >> 08:42 999 005_words_... >> 08:45 38160 006_lines_... >> 08:47 1134 006_words_... >> 08:50 9720 007_lines_... >> 08:51 1260 007_words_... >> 08:55 9864 008_lines_... >> 08:56 1260 008_words_... >> 09:00 10656 009_lines_... >> 09:01 1395 009_words_... >> 09:05 11664 010_lines_... >> 09:06 1395 010_words_... >> 09:11 10935 011_lines_... >> 09:11 1521 011_words_... >> 09:16 11745 012_lines_... >> 09:16 1530 012_words_... >> 09:21 12069 013_lines_... >> 09:22 1656 013_words_... >> 09:27 10692 014_lines_... >> 09:27 1665 014_words_... >> 09:32 10449 015_lines_... >> 09:32 1791 015_words_... >> 09:37 11178 016_lines_... >> 09:37 1800 016_words_... >> 09:45 17496 017_lines_... >> 09:45 1926 017_words_... >> 09:55 22032 018_lines_... >> 09:56 2061 018_words_... >> 10:05 21951 019_lines_... >> 10:06 2196 019_words_... >> 10:15 21870 020_lines_... >> 10:16 2322 020_words_... >> 10:25 21303 021_lines_... >> 10:26 2340 021_words_... >> >> >> final SparkConf conf = new >> SparkConf().setMaster("local[4]").setAppName("WordCount"); >> try (final JavaStreamingContext context = new >> JavaStreamingContext(conf, Durations.seconds(10))) { >> >> context.checkpoint("/tmp/checkpoint"); >> >> final JavaDStream<String> lines = context.union( >> context.receiverStream(new GeneratorReceiver()), >> ImmutableList.of( >> context.receiverStream(new GeneratorReceiver()), >> context.receiverStream(new GeneratorReceiver()))); >> >> lines.print(); >> >> final Accumulator<Integer> lineRddIndex = >> context.sparkContext().accumulator(0); >> lines.foreachRDD( rdd -> { >> lineRddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> lineRddIndex.localValue()) + "_lines_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.randomUUID())) { >> rdd.collect().forEach(s -> out.println(s)); >> } >> return null; >> }); >> >> final JavaDStream<String> words = >> lines.flatMap(x -> Arrays.asList(x.split(" "))); >> final JavaPairDStream<String, Integer> pairs = >> words.mapToPair(s -> new Tuple2<String, Integer>(s, 1)); >> final JavaPairDStream<String, Integer> wordCounts = >> pairs.reduceByKey((i1, i2) -> i1 + i2); >> >> final Accumulator<Integer> sleep = >> context.sparkContext().accumulator(0); >> final JavaPairDStream<String, Integer> wordCounts2 = >> JavaPairDStream.fromJavaDStream( >> wordCounts.transform( (rdd) -> { >> sleep.add(1); >> Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000); >> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), >> rdd.classTag()); >> })); >> >> final Function2<List<Integer>, Optional<Integer>, >> Optional<Integer>> updateFunction = >> (values, state) -> { >> Integer newSum = state.or(0); >> for (final Integer value : values) { >> newSum += value; >> } >> return Optional.of(newSum); >> }; >> >> final List<Tuple2<String, Integer>> tuples = >> ImmutableList.<Tuple2<String, Integer>> of(); >> final JavaPairRDD<String, Integer> initialRDD = >> context.sparkContext().parallelizePairs(tuples); >> >> final JavaPairDStream<String, Integer> wordCountsState = >> wordCounts2.updateStateByKey( >> updateFunction, >> new >> HashPartitioner(context.sparkContext().defaultParallelism()), >> initialRDD); >> >> wordCountsState.print(); >> >> final Accumulator<Integer> rddIndex = >> context.sparkContext().accumulator(0); >> wordCountsState.foreachRDD( rdd -> { >> rddIndex.add(1); >> final String prefix = "/tmp/" + String.format("%03d", >> rddIndex.localValue()) + "_words_"; >> try (final PrintStream out = new PrintStream(prefix + >> UUID.randomUUID())) { >> rdd.collect().forEach(s -> out.println(s)); >> } >> return null; >> }); >> >> context.start(); >> context.awaitTermination(); >> } >> >> >> On 17 June 2015 at 17:25, Tathagata Das <[email protected]> wrote: >> > The default behavior should be that batch X + 1 starts processing only >> after >> > batch X completes. If you are using Spark 1.4.0, could you show us a >> > screenshot of the streaming tab, especially the list of batches? And >> could >> > you also tell us if you are setting any SparkConf configurations? >> > >> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <[email protected]> >> wrote: >> >> >> >> Is it possible to achieve serial batching with Spark Streaming? >> >> >> >> Example: >> >> >> >> I configure the Streaming Context for creating a batch every 3 seconds. >> >> >> >> Processing of the batch #2 takes longer than 3 seconds and creates a >> >> backlog of batches: >> >> >> >> batch #1 takes 2s >> >> batch #2 takes 10s >> >> batch #3 takes 2s >> >> batch #4 takes 2s >> >> >> >> Whet testing locally, it seems that processing of multiple batches is >> >> finished at the same time: >> >> >> >> batch #1 finished at 2s >> >> batch #2 finished at 12s >> >> batch #3 finished at 12s (processed in parallel) >> >> batch #4 finished at 15s >> >> >> >> How can I delay processing of the next batch, so that is processed >> >> only after processing of the previous batch has been completed? >> >> >> >> batch #1 finished at 2s >> >> batch #2 finished at 12s >> >> batch #3 finished at 14s (processed serially) >> >> batch #4 finished at 16s >> >> >> >> I want to perform a transformation for every key only once in a given >> >> period of time (e.g. batch duration). I find all unique keys in a >> >> batch and perform the transformation on each key. To ensure that the >> >> transformation is done for every key only once, only one batch can be >> >> processed at a time. At the same time, I want that single batch to be >> >> processed in parallel. >> >> >> >> context = new JavaStreamingContext(conf, Durations.seconds(10)); >> >> stream = context.receiverStream(...); >> >> stream >> >> .reduceByKey(...) >> >> .transform(...) >> >> .foreachRDD(output); >> >> >> >> Any ideas or pointers are very welcome. >> >> >> >> Thanks! >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: [email protected] >> >> For additional commands, e-mail: [email protected] >> >> >> > >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> >
