Hi TD, Thanks for the response. I do believe I understand the concept and the need for the filterfunction now. I made the requisite code changes and keeping it running overnight to see the effect of it. Hopefully this should fix our issue.
However, there was one place where I encountered a followup issue and had to disable that reduce operation for the moment in order to proceed with my testing for the rest of the changes. This particular reduceByKeyAndWindow operates on a key-value pair <String, HashSet<Long>>. Once the size of a HashSet drops to 0, we remove the corresponding Key with the filterfunction specified as ( p -> ! p._2().isEmpty()) That looks about right to me. However, soon after the first slide occurs in this window, its throwing the following exceptions and aborting that batch. The stack trace is below. I am not quite sure what to make of it (perhaps partly due to the late hour :-D ). Any idea what could be wrong here? As far as I know, String and HashSet<Long> should hash quite consistently. Also, if there is no way to avoid this issue, I am thinking of rewriting that part of the code to use a foldByKey or combineByKey operation instead of reduceByKey. Thanks Nikunj java.lang.Exception: Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently? at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147) at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das <t...@databricks.com> wrote: > Responses inline. > > On Thu, Jul 16, 2015 at 9:27 PM, N B <nb.nos...@gmail.com> wrote: > >> Hi TD, >> >> Yes, we do have the invertible function provided. However, I am not sure >> I understood how to use the filterFunction. Is there an example >> somewhere showing its usage? >> >> The header comment on the function says : >> >> * @param filterFunc function to filter expired key-value pairs; >> * only pairs that satisfy the function are retained >> * set this to null if you do not want to filter >> >> These are the questions I am confused about: >> >> 1. The code comment seems to imply that the filterFunc is only used to >> figure out which keyvalue pairs are used to form the window but how does it >> actually help expire the old data? >> >> It applies to filter and retains only the keys that pass through it. > Underneath, its all RDDs, so only the filtered K, V pairs are retained (and > cached) for future batches. > > >> 2. Shouldn't the values that are "falling off" of the window period >> automatically be removed without the need for an additional filter function? >> >> It cannot figure out the "falling off" the in this incremental version. > For example, if you are counting over the window by adding (reduceFunc) and > subtracting (invRedueFunc), unless your provided the concept of a "zero" , > it will not know when to throw away the keys that have become 0. Over a > window, the count may increase from "nothing" to 10, and then reduce "0" > when the window moves forward, but it does not know "0" means "dont track > it any more". The filter function introduces that concept of zero. > > > >> 3. Which side of the key-value pairs are passed to this function? The ones >> that are coming in or the ones that are going out of window or both? >> >> All of the k,v pairs that are being tracked. > > > >> 4. The key-value pairs in use in a particular reduceByKeyAndWindow operation >> may not have the requisite info (such as a timestamp or similar eg if its >> aggregated data) to help determine whether to return true or false. What is >> the semantic expected here? >> >> >> I am not sure I get your question. It is upto you to provide sufficient > information as part of the "value" so that you can take that decision in > the filter function. > > > >> As always, thanks for your help >> >> Nikunj >> >> >> >> >> >> On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> MAke sure you provide the filterFunction with the invertible >>> reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the >>> key space will continue increase. This is what is leading to the lag. So >>> use the filtering function to filter out the keys that are not needed any >>> more. >>> >>> On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> What is your data volume? Are you having checkpointing/WAL enabled? In >>>> that case make sure you are having SSD disks as this behavior is mainly due >>>> to the IO wait. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Thu, Jul 16, 2015 at 8:43 AM, N B <nb.nos...@gmail.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> We have a Spark streaming application and the problem that we are >>>>> encountering is that the batch processing time keeps on increasing and >>>>> eventually causes the application to start lagging. I am hoping that >>>>> someone here can point me to any underlying cause of why this might >>>>> happen. >>>>> >>>>> The batch interval is 1 minute as of now and the app does some maps, >>>>> filters, joins and reduceByKeyAndWindow operations. All the reduces are >>>>> invertible functions and so we do provide the inverse-reduce functions in >>>>> all those. The largest window size we have is 1 hour right now. When the >>>>> app is started, we see that the batch processing time is between 20 and 30 >>>>> seconds. It keeps creeping up slowly and by the time it hits the 1 hour >>>>> mark, it somewhere around 35-40 seconds. Somewhat expected and still not >>>>> bad! >>>>> >>>>> I would expect that since the largest window we have is 1 hour long, >>>>> the application should stabilize around the 1 hour mark and start >>>>> processing subsequent batches within that 35-40 second zone. However, that >>>>> is not what is happening. The processing time still keeps increasing and >>>>> eventually in a few hours it exceeds 1 minute mark and then starts >>>>> lagging. >>>>> Eventually the lag builds up and becomes in minutes at which point we have >>>>> to restart the system. >>>>> >>>>> Any pointers on why this could be happening and what we can do to >>>>> troubleshoot further? >>>>> >>>>> Thanks >>>>> Nikunj >>>>> >>>>> >>>> >>> >> >