I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
get the serialized behavior by using default scheduler when there is
failure and retry
so I created a customized stream like this.
class EachSeqRDD[T: ClassTag] (
parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
override def slideDuration: Duration = parent.slideDuration
override def dependencies: List[DStream[_]] = List(parent)
override def compute(validTime: Time): Option[RDD[Unit]] = None
override private[streaming] def generateJob(time: Time): Option[Job] = {
val pendingJobs = ssc.scheduler.getPendingTimes().size
logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
// do not generate new RDD if there is pending job
if (pendingJobs == 0) {
parent.getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
ssc.sparkContext.setCallSite(creationSite)
eachSeqFunc(rdd, time)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
else {
None
}
}
}
object DStreamEx {
implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
// because the DStream is reachable from the outer object here,
and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to
SparkContext.clean
new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
false)).register()
}
}
}
-Binh
On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia <[email protected]> wrote:
> Tathagata, thanks for your response. You are right! Everything seems
> to work as expected.
>
> Please could help me understand why the time for processing of all
> jobs for a batch is always less than 4 seconds?
>
> Please see my playground code below.
>
> The last modified time of the input (lines) RDD dump files seems to
> match the Thread.sleep delays (20s or 5s) in the transform operation
> or the batching interval (10s): 20s, 5s, 10s.
>
> However, neither the batch processing time in the Streaming tab nor
> the last modified time of the output (words) RDD dump files reflect
> the Thread.sleep delays.
>
> 07:20 3240 001_lines_...
> 07:21 117 001_words_...
> 07:41 37224 002_lines_...
> 07:43 252 002_words_...
> 08:00 37728 003_lines_...
> 08:02 504 003_words_...
> 08:20 38952 004_lines_...
> 08:22 756 004_words_...
> 08:40 38664 005_lines_...
> 08:42 999 005_words_...
> 08:45 38160 006_lines_...
> 08:47 1134 006_words_...
> 08:50 9720 007_lines_...
> 08:51 1260 007_words_...
> 08:55 9864 008_lines_...
> 08:56 1260 008_words_...
> 09:00 10656 009_lines_...
> 09:01 1395 009_words_...
> 09:05 11664 010_lines_...
> 09:06 1395 010_words_...
> 09:11 10935 011_lines_...
> 09:11 1521 011_words_...
> 09:16 11745 012_lines_...
> 09:16 1530 012_words_...
> 09:21 12069 013_lines_...
> 09:22 1656 013_words_...
> 09:27 10692 014_lines_...
> 09:27 1665 014_words_...
> 09:32 10449 015_lines_...
> 09:32 1791 015_words_...
> 09:37 11178 016_lines_...
> 09:37 1800 016_words_...
> 09:45 17496 017_lines_...
> 09:45 1926 017_words_...
> 09:55 22032 018_lines_...
> 09:56 2061 018_words_...
> 10:05 21951 019_lines_...
> 10:06 2196 019_words_...
> 10:15 21870 020_lines_...
> 10:16 2322 020_words_...
> 10:25 21303 021_lines_...
> 10:26 2340 021_words_...
>
>
> final SparkConf conf = new
> SparkConf().setMaster("local[4]").setAppName("WordCount");
> try (final JavaStreamingContext context = new
> JavaStreamingContext(conf, Durations.seconds(10))) {
>
> context.checkpoint("/tmp/checkpoint");
>
> final JavaDStream<String> lines = context.union(
> context.receiverStream(new GeneratorReceiver()),
> ImmutableList.of(
> context.receiverStream(new GeneratorReceiver()),
> context.receiverStream(new GeneratorReceiver())));
>
> lines.print();
>
> final Accumulator<Integer> lineRddIndex =
> context.sparkContext().accumulator(0);
> lines.foreachRDD( rdd -> {
> lineRddIndex.add(1);
> final String prefix = "/tmp/" + String.format("%03d",
> lineRddIndex.localValue()) + "_lines_";
> try (final PrintStream out = new PrintStream(prefix +
> UUID.randomUUID())) {
> rdd.collect().forEach(s -> out.println(s));
> }
> return null;
> });
>
> final JavaDStream<String> words =
> lines.flatMap(x -> Arrays.asList(x.split(" ")));
> final JavaPairDStream<String, Integer> pairs =
> words.mapToPair(s -> new Tuple2<String, Integer>(s, 1));
> final JavaPairDStream<String, Integer> wordCounts =
> pairs.reduceByKey((i1, i2) -> i1 + i2);
>
> final Accumulator<Integer> sleep =
> context.sparkContext().accumulator(0);
> final JavaPairDStream<String, Integer> wordCounts2 =
> JavaPairDStream.fromJavaDStream(
> wordCounts.transform( (rdd) -> {
> sleep.add(1);
> Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000);
> return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag());
> }));
>
> final Function2<List<Integer>, Optional<Integer>,
> Optional<Integer>> updateFunction =
> (values, state) -> {
> Integer newSum = state.or(0);
> for (final Integer value : values) {
> newSum += value;
> }
> return Optional.of(newSum);
> };
>
> final List<Tuple2<String, Integer>> tuples =
> ImmutableList.<Tuple2<String, Integer>> of();
> final JavaPairRDD<String, Integer> initialRDD =
> context.sparkContext().parallelizePairs(tuples);
>
> final JavaPairDStream<String, Integer> wordCountsState =
> wordCounts2.updateStateByKey(
> updateFunction,
> new
> HashPartitioner(context.sparkContext().defaultParallelism()),
> initialRDD);
>
> wordCountsState.print();
>
> final Accumulator<Integer> rddIndex =
> context.sparkContext().accumulator(0);
> wordCountsState.foreachRDD( rdd -> {
> rddIndex.add(1);
> final String prefix = "/tmp/" + String.format("%03d",
> rddIndex.localValue()) + "_words_";
> try (final PrintStream out = new PrintStream(prefix +
> UUID.randomUUID())) {
> rdd.collect().forEach(s -> out.println(s));
> }
> return null;
> });
>
> context.start();
> context.awaitTermination();
> }
>
>
> On 17 June 2015 at 17:25, Tathagata Das <[email protected]> wrote:
> > The default behavior should be that batch X + 1 starts processing only
> after
> > batch X completes. If you are using Spark 1.4.0, could you show us a
> > screenshot of the streaming tab, especially the list of batches? And
> could
> > you also tell us if you are setting any SparkConf configurations?
> >
> > On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <[email protected]>
> wrote:
> >>
> >> Is it possible to achieve serial batching with Spark Streaming?
> >>
> >> Example:
> >>
> >> I configure the Streaming Context for creating a batch every 3 seconds.
> >>
> >> Processing of the batch #2 takes longer than 3 seconds and creates a
> >> backlog of batches:
> >>
> >> batch #1 takes 2s
> >> batch #2 takes 10s
> >> batch #3 takes 2s
> >> batch #4 takes 2s
> >>
> >> Whet testing locally, it seems that processing of multiple batches is
> >> finished at the same time:
> >>
> >> batch #1 finished at 2s
> >> batch #2 finished at 12s
> >> batch #3 finished at 12s (processed in parallel)
> >> batch #4 finished at 15s
> >>
> >> How can I delay processing of the next batch, so that is processed
> >> only after processing of the previous batch has been completed?
> >>
> >> batch #1 finished at 2s
> >> batch #2 finished at 12s
> >> batch #3 finished at 14s (processed serially)
> >> batch #4 finished at 16s
> >>
> >> I want to perform a transformation for every key only once in a given
> >> period of time (e.g. batch duration). I find all unique keys in a
> >> batch and perform the transformation on each key. To ensure that the
> >> transformation is done for every key only once, only one batch can be
> >> processed at a time. At the same time, I want that single batch to be
> >> processed in parallel.
> >>
> >> context = new JavaStreamingContext(conf, Durations.seconds(10));
> >> stream = context.receiverStream(...);
> >> stream
> >> .reduceByKey(...)
> >> .transform(...)
> >> .foreachRDD(output);
> >>
> >> Any ideas or pointers are very welcome.
> >>
> >> Thanks!
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: [email protected]
> >> For additional commands, e-mail: [email protected]
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>