We have a streaming application containing approximately 12 stages every
batch, running in streaming mode (4 sec batches). Each stage persists
output to cassandra

the pipeline stages
stage 1

---> receive Stream A --> map --> filter -> (union with another stream B)
--> map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more stages of transforms and save to database.

Around stage 5, we union the output of Dstream from stage 1 (in red) with
another stream (generated by split during stage 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I
can see this in execution graph & also performance -> processing time).
Processing time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as
2.5 times slower compared to runs without the union - union for most
batches does not alter the original DStream (union with an empty set). If I
cache the DStream (red block output), performance improves substantially
but hit out of memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is
no dstream level "unpersist"

setting "spark.streaming.unpersist" to true and
streamingContext.remember("duration") did not help. Still seeing out of
memory errors

Krishna

Reply via email to