Hi TD,

Thanks for the response. I do believe I understand the concept and the need
for the filterfunction now. I made the requisite code changes and keeping
it running overnight to see the effect of it. Hopefully this should fix our
issue.

However, there was one place where I encountered a followup issue and had
to disable that reduce operation for the moment in order to proceed with my
testing for the rest of the changes.

This particular reduceByKeyAndWindow operates on a key-value pair <String,
HashSet<Long>>. Once the size of a HashSet drops to 0, we remove the
corresponding Key with the filterfunction specified as

( p -> ! p._2().isEmpty())

That looks about right to me. However, soon after the first slide occurs in
this window, its throwing the following exceptions and aborting that batch.
The stack trace is below. I am not quite sure what to make of it (perhaps
partly due to the late hour :-D ). Any idea what could be wrong here? As
far as I know, String and HashSet<Long> should hash quite consistently.

Also, if there is no way to avoid this issue, I am thinking of rewriting
that part of the code to use a foldByKey or combineByKey operation instead
of reduceByKey.

Thanks
Nikunj


java.lang.Exception: Neither previous window has value for key, nor new
values found. Are you sure your key class hashes consistently?
        at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147)
        at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
        at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
        at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
        at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
        at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)




On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das <t...@databricks.com> wrote:

> Responses inline.
>
> On Thu, Jul 16, 2015 at 9:27 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Hi TD,
>>
>> Yes, we do have the invertible function provided. However, I am not sure
>> I understood how to use the filterFunction. Is there an example
>> somewhere showing its usage?
>>
>> The header comment on the function says :
>>
>> * @param filterFunc     function to filter expired key-value pairs;
>> *                       only pairs that satisfy the function are retained
>> *                       set this to null if you do not want to filter
>>
>> These are the questions I am confused about:
>>
>> 1. The code comment seems to imply that the filterFunc is only used to 
>> figure out which keyvalue pairs are used to form the window but how does it 
>> actually help expire the old data?
>>
>> It applies to filter and retains only the keys that pass through it.
> Underneath, its all RDDs, so only the filtered K, V pairs are retained (and
> cached) for future batches.
>
>
>> 2. Shouldn't the values that are "falling off" of the window period 
>> automatically be removed without the need for an additional filter function?
>>
>> It cannot figure out the "falling off" the in this incremental version.
> For example, if you are counting over the window by adding (reduceFunc) and
> subtracting (invRedueFunc), unless your provided the concept of a "zero" ,
> it will not know when to throw away the keys that have become 0. Over a
> window, the count may increase from "nothing" to 10, and then reduce "0"
> when the window moves forward, but it does not know "0" means "dont track
> it any more". The filter function introduces that concept of zero.
>
>
>
>> 3. Which side of the key-value pairs are passed to this function? The ones 
>> that are coming in or the ones that are going out of window or both?
>>
>> All of the k,v pairs that are being tracked.
>
>
>
>> 4. The key-value pairs in use in a particular reduceByKeyAndWindow operation 
>> may not have  the requisite info (such as a timestamp or similar eg if its 
>> aggregated data) to help determine whether to return true or false. What is 
>> the semantic expected here?
>>
>>
>> I am not sure I get your question. It is upto you to provide sufficient
> information as part of the "value" so that you can take that decision in
> the filter function.
>
>
>
>> As always, thanks for your help
>>
>> Nikunj
>>
>>
>>
>>
>>
>> On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> MAke sure you provide the filterFunction with the invertible
>>> reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the
>>> key space will continue increase. This is what is leading to the lag. So
>>> use the filtering function to filter out the keys that are not needed any
>>> more.
>>>
>>> On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> What is your data volume? Are you having checkpointing/WAL enabled? In
>>>> that case make sure you are having SSD disks as this behavior is mainly due
>>>> to the IO wait.
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Thu, Jul 16, 2015 at 8:43 AM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have a Spark streaming application and the problem that we are
>>>>> encountering is that the batch processing time keeps on increasing and
>>>>> eventually causes the application to start lagging. I am hoping that
>>>>> someone here can point me to any underlying cause of why this might 
>>>>> happen.
>>>>>
>>>>> The batch interval is 1 minute as of now and the app does some maps,
>>>>> filters, joins and reduceByKeyAndWindow operations. All the reduces are
>>>>> invertible functions and so we do provide the inverse-reduce functions in
>>>>> all those. The largest window size we have is 1 hour right now. When the
>>>>> app is started, we see that the batch processing time is between 20 and 30
>>>>> seconds. It keeps creeping up slowly and by the time it hits the 1 hour
>>>>> mark, it somewhere around 35-40 seconds. Somewhat expected and still not
>>>>> bad!
>>>>>
>>>>> I would expect that since the largest window we have is 1 hour long,
>>>>> the application should stabilize around the 1 hour mark and start
>>>>> processing subsequent batches within that 35-40 second zone. However, that
>>>>> is not what is happening. The processing time still keeps increasing and
>>>>> eventually in a few hours it exceeds 1 minute mark and then starts 
>>>>> lagging.
>>>>> Eventually the lag builds up and becomes in minutes at which point we have
>>>>> to restart the system.
>>>>>
>>>>> Any pointers on why this could be happening and what we can do to
>>>>> troubleshoot further?
>>>>>
>>>>> Thanks
>>>>> Nikunj
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to