Hello Ted,

The JIRA you pointed is a different issue. Here are more details of the
issue I am talking about:

Consider code block this:

val streamingContext = new StreamingContext(sparkConfig, Seconds(2))

val totalVideoImps = streamingContext.sparkContext.accumulator(0,
"TotalVideoImpressions")
val totalImps = streamingContext.sparkContext.accumulator(0, "TotalImpressions")

val stream = KafkaReader.KafkaDirectStream(streamingContext)
stream.map(KafkaAdLogParser.parseAdLogRecord)
  .filter(record => {
    totalImps += 1
    KafkaAdLogParser.isVideoRecord(record)
  })
  .map(record => {
    totalVideoImps += 1
    record.url
  })
  .window(Seconds(120))
  .count().foreachRDD((rdd, time) => {
  println("Timestamp: " + ImpressionAggregator.millsToDate(time.milliseconds))
  println("Count: " + rdd.collect()(0))
  println("Total Impressions: " + totalImps.value)
  totalImps.setValue(0)
  println("Total Video Impressions: " + totalVideoImps.value)
  totalVideoImps.setValue(0)
})
streamingContext.start()
streamingContext.awaitTermination()


Batch Size before window operation is 2 sec and then after window batches
are of 120 seconds each. Now the output of the above program for first 2
batches of 120 sec each is:

Timestamp: 2016-03-06 12:02:56,000
Count: 362195
Total Impressions: 16882431
Total Video Impressions: 362195

Timestamp: 2016-03-06 12:04:56,000
Count: 367168
Total Impressions: 19480293
Total Video Impressions: 367168

Timestamp: 2016-03-06 12:06:56,000
Count: 177711
Total Impressions: 10196677
Total Video Impressions: 177711

Whereas the spark UI shows different numbers as attached in the image. Also
if we check the start and end index of kafka partition offsets reported by
subsequent batch entries on UI, they do not result in all overall
continuous range. All numbers are fine if we remove the window operation
though.

I think this is a bug and I couldn't find any existing bug regarding this.

--
Thanks
Jatin

On Sun, Mar 6, 2016 at 8:29 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you taken a look at SPARK-12739 ?
>
> FYI
>
> On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello all,
>>
>> Consider following two code blocks:
>>
>> val ssc = new StreamingContext(sparkConfig, Seconds(2))
>> val stream = KafkaUtils.createDirectStream(...)
>>
>> a) stream.filter(filterFunc).count().foreachRDD(rdd =>
>> println(rdd.collect()))
>> b) stream.filter(filterFunc).window(Seconds(60),
>> Seconds(60)).count().foreachRDD(rdd => println(rdd.collect()))
>>
>> I have observed that in case
>> a) the UI behaves correctly and numbers reported for each batch are
>> correct.
>> b) UI reports numbers every 60 seconds but the batch-id/input-size etc
>> are for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch
>> etc. These numbers become totally useless, infact confusing in this case
>> though the delay/processing-time numbers are still helpful.
>>
>> Is someone working on a fix to show aggregated numbers which will be more
>> useful?
>>
>> --
>> Thanks
>> Jatin
>>
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to