"You need to keep a certain number of rdds around for checkpointing" --
that seems like a hefty expense to pay in order to achieve fault
tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
sufficient to just persist the offsets, to know where to resume from?

Thanks.

On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You need to keep a certain number of rdds around for checkpointing, based
> on e.g. the window size.  Those would all need to be loaded at once.
>
> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Would there be a way to chunk up/batch up the contents of the
>> checkpointing directories as they're being processed by Spark Streaming?
>> Is it mandatory to load the whole thing in one go?
>>
>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> I wonder during recovery from a checkpoint whether we can estimate the
>>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory
>>> ().
>>>
>>> If the size of checkpoint is much bigger than free memory, log warning,
>>> etc
>>>
>>> Cheers
>>>
>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>>>> have the original checkpointing directory :(  Thanks for the clarification
>>>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>>>
>>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> That looks like it's during recovery from a checkpoint, so it'd be
>>>>> driver memory not executor memory.
>>>>>
>>>>> How big is the checkpoint directory that you're trying to restore from?
>>>>>
>>>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> We're getting the below error.  Tried increasing
>>>>>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>>>>>> happens.
>>>>>>
>>>>>> Any recommendations? Something to do with specifying -Xmx in the
>>>>>> submit job scripts?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>>>>>> limit exceeded
>>>>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>>>>> at
>>>>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>>>>> at
>>>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>>>>> at
>>>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>>>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>>>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>>>>> at
>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>>>>> at
>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>>>>>> at
>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>>>>>> at org.apache.spark.rdd.RDD.<init>(RDD.scala:1365)
>>>>>> at org.apache.spark.streaming.kafka.KafkaRDD.<init>(KafkaRDD.scala:46)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>>>>>> at
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>>>> at
>>>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>>>>> at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to