Tathagata, thanks for your response. You are right! Everything seems
to work as expected.

Please could help me understand why the time for processing of all
jobs for a batch is always less than 4 seconds?

Please see my playground code below.

The last modified time of the input (lines) RDD dump files seems to
match the Thread.sleep delays (20s or 5s) in the transform operation
or the batching interval (10s): 20s, 5s, 10s.

However, neither the batch processing time in the Streaming tab nor
the last modified time of the output (words) RDD dump files reflect
the Thread.sleep delays.

07:20       3240  001_lines_...
      07:21 117   001_words_...
07:41       37224 002_lines_...
      07:43 252   002_words_...
08:00       37728 003_lines_...
      08:02 504   003_words_...
08:20       38952 004_lines_...
      08:22 756   004_words_...
08:40       38664 005_lines_...
      08:42 999   005_words_...
08:45       38160 006_lines_...
      08:47 1134  006_words_...
08:50       9720  007_lines_...
      08:51 1260  007_words_...
08:55       9864  008_lines_...
      08:56 1260  008_words_...
09:00       10656 009_lines_...
      09:01 1395  009_words_...
09:05       11664 010_lines_...
      09:06 1395  010_words_...
09:11       10935 011_lines_...
      09:11 1521  011_words_...
09:16       11745 012_lines_...
      09:16 1530  012_words_...
09:21       12069 013_lines_...
      09:22 1656  013_words_...
09:27       10692 014_lines_...
      09:27 1665  014_words_...
09:32       10449 015_lines_...
      09:32 1791  015_words_...
09:37       11178 016_lines_...
      09:37 1800  016_words_...
09:45       17496 017_lines_...
      09:45 1926  017_words_...
09:55       22032 018_lines_...
      09:56 2061  018_words_...
10:05       21951 019_lines_...
      10:06 2196  019_words_...
10:15       21870 020_lines_...
      10:16 2322  020_words_...
10:25       21303 021_lines_...
      10:26 2340  021_words_...


final SparkConf conf = new
SparkConf().setMaster("local[4]").setAppName("WordCount");
try (final JavaStreamingContext context = new
JavaStreamingContext(conf, Durations.seconds(10))) {

    context.checkpoint("/tmp/checkpoint");

    final JavaDStream<String> lines = context.union(
        context.receiverStream(new GeneratorReceiver()),
        ImmutableList.of(
            context.receiverStream(new GeneratorReceiver()),
            context.receiverStream(new GeneratorReceiver())));

    lines.print();

    final Accumulator<Integer> lineRddIndex =
context.sparkContext().accumulator(0);
    lines.foreachRDD( rdd -> {
        lineRddIndex.add(1);
        final String prefix = "/tmp/" + String.format("%03d",
lineRddIndex.localValue()) + "_lines_";
        try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
            rdd.collect().forEach(s -> out.println(s));
        }
        return null;
    });

    final JavaDStream<String> words =
        lines.flatMap(x -> Arrays.asList(x.split(" ")));
    final JavaPairDStream<String, Integer> pairs =
        words.mapToPair(s -> new Tuple2<String, Integer>(s, 1));
    final JavaPairDStream<String, Integer> wordCounts =
        pairs.reduceByKey((i1, i2) -> i1 + i2);

    final Accumulator<Integer> sleep = context.sparkContext().accumulator(0);
    final JavaPairDStream<String, Integer> wordCounts2 =
JavaPairDStream.fromJavaDStream(
        wordCounts.transform( (rdd) -> {
            sleep.add(1);
            Thread.sleep(sleep.localValue() < 6 ? 20000 : 5000);
            return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag());
        }));

    final Function2<List<Integer>, Optional<Integer>,
Optional<Integer>> updateFunction =
        (values, state) -> {
            Integer newSum = state.or(0);
            for (final Integer value : values) {
                newSum += value;
            }
            return Optional.of(newSum);
        };

    final List<Tuple2<String, Integer>> tuples =
ImmutableList.<Tuple2<String, Integer>> of();
    final JavaPairRDD<String, Integer> initialRDD =
context.sparkContext().parallelizePairs(tuples);

    final JavaPairDStream<String, Integer> wordCountsState =
        wordCounts2.updateStateByKey(
             updateFunction,
             new
HashPartitioner(context.sparkContext().defaultParallelism()),
initialRDD);

    wordCountsState.print();

    final Accumulator<Integer> rddIndex = context.sparkContext().accumulator(0);
    wordCountsState.foreachRDD( rdd -> {
        rddIndex.add(1);
        final String prefix = "/tmp/" + String.format("%03d",
rddIndex.localValue()) + "_words_";
        try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
            rdd.collect().forEach(s -> out.println(s));
        }
        return null;
    });

    context.start();
    context.awaitTermination();
}


On 17 June 2015 at 17:25, Tathagata Das <t...@databricks.com> wrote:
> The default behavior should be that batch X + 1 starts processing only after
> batch X completes. If you are using Spark 1.4.0, could you show us a
> screenshot of the streaming tab, especially the list of batches? And could
> you also tell us if you are setting any SparkConf configurations?
>
> On Wed, Jun 17, 2015 at 12:22 PM, Michal Čizmazia <mici...@gmail.com> wrote:
>>
>> Is it possible to achieve serial batching with Spark Streaming?
>>
>> Example:
>>
>> I configure the Streaming Context for creating a batch every 3 seconds.
>>
>> Processing of the batch #2 takes longer than 3 seconds and creates a
>> backlog of batches:
>>
>> batch #1 takes 2s
>> batch #2 takes 10s
>> batch #3 takes 2s
>> batch #4 takes 2s
>>
>> Whet testing locally, it seems that processing of multiple batches is
>> finished at the same time:
>>
>> batch #1 finished at 2s
>> batch #2 finished at 12s
>> batch #3 finished at 12s (processed in parallel)
>> batch #4 finished at 15s
>>
>> How can I delay processing of the next batch, so that is processed
>> only after processing of the previous batch has been completed?
>>
>> batch #1 finished at 2s
>> batch #2 finished at 12s
>> batch #3 finished at 14s (processed serially)
>> batch #4 finished at 16s
>>
>> I want to perform a transformation for every key only once in a given
>> period of time (e.g. batch duration). I find all unique keys in a
>> batch and perform the transformation on each key. To ensure that the
>> transformation is done for every key only once, only one batch can be
>> processed at a time. At the same time, I want that single batch to be
>> processed in parallel.
>>
>> context = new JavaStreamingContext(conf, Durations.seconds(10));
>> stream = context.receiverStream(...);
>> stream
>>     .reduceByKey(...)
>>     .transform(...)
>>     .foreachRDD(output);
>>
>> Any ideas or pointers are very welcome.
>>
>> Thanks!
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to