You could override the spark conf called
"spark.streaming.receiver.writeAheadLog.class" with the class name.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala#L30

On Thu, Sep 17, 2015 at 2:04 PM, Michal Čizmazia <mici...@gmail.com> wrote:

> Please could you explain how to use pluggable WAL?
>
> After I implement the WriteAheadLog abstract class, how can I use it? I
> want to use it with a Custom Reliable Receiver. I am using Spark 1.4.1.
>
> Thanks!
>
>
> On 17 September 2015 at 16:40, Tathagata Das <t...@databricks.com> wrote:
>
>> Actually, the current WAL implementation (as of Spark 1.5) does not work
>> with S3 because S3 does not support flushing. Basically, the current
>> implementation assumes that after write + flush, the data is immediately
>> durable, and readable if the system crashes without closing the WAL file.
>> This does not work with S3 as data is durable only and only if the S3 file
>> output stream is cleanly closed.
>>
>>
>>
>>
>> On Thu, Sep 17, 2015 at 1:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> I assume you don't use Kinesis.
>>>
>>> Are you running Spark 1.5.0 ?
>>> If you must use S3, is switching to Kinesis possible ?
>>>
>>> Cheers
>>>
>>> On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia <mici...@gmail.com>
>>> wrote:
>>>
>>>> How to make Write Ahead Logs to work with S3? Any pointers welcome!
>>>>
>>>> It seems as a known issue:
>>>> https://issues.apache.org/jira/browse/SPARK-9215
>>>>
>>>> I am getting this exception when reading write ahead log:
>>>>
>>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>>> due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
>>>> failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
>>>> org.apache.spark.SparkException: Could not read data from write ahead log
>>>> record
>>>> FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
>>>>         at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>         at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.NullPointerException
>>>>         at
>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
>>>>         at
>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>         ... 13 more
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to