Raised this for checkpointing, hopefully it gets some priority as it's very
useful and relatively straightforward to implement ?

https://issues.apache.org/jira/browse/SPARK-11879

On 18 November 2015 at 16:31, Cristian O <cristian.b.op...@googlemail.com>
wrote:

> Hi,
>
> While these OSS efforts are interesting, they're for now quite unproven.
> Personally would be much more interested in seeing Spark incrementally
> moving towards supporting updating DataFrames on various storage
> substrates, and first of all locally, perhaps as an extension of cached
> DataFrames.
>
> However before we get full blown update support, I would suggest two
> enhancements that are fairly straightforward with the current design. If
> they make sense please let me know and I'll add them as Jiras:
>
> 1. Checkpoint support for DataFrames - as mentioned this can be as simple
> as saving to a parquet file or some other format, but would not require
> re-reading the file to alter the lineage, and would also prune the logical
> plan. Alternatively checkpointing a cached DataFrame can delegate to
> checkpointing the underlying RDD but again needs to prune the logical plan.
>
> 2. Efficient transformation of cached DataFrames to cached DataFrames - an
> efficient copy-on-write mechanism can be used to avoid unpacking
> CachedBatches (row groups) into InternalRows when building a cached
> DataFrame out of a source cached DataFrame through transformations (like an
> outer join) that only affect a small subset of rows. Statistics and
> partitioning information can be used to determine which row groups are
> affected and which can be copied *by reference* unchanged. This would
> effectively allow performing immutable updates of cached DataFrames in
> scenarios like Streaming or other iterative use cases like ML.
>
> Thanks,
> Cristian
>
>
>
> On 16 November 2015 at 08:30, Mark Hamstra <m...@clearstorydata.com>
> wrote:
>
>> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>>
>> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>>> input/output format support:
>>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>>
>>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <r...@databricks.com>
>>> wrote:
>>>
>>>> This (updates) is something we are going to think about in the next
>>>> release or two.
>>>>
>>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>>>> cristian.b.op...@googlemail.com> wrote:
>>>>
>>>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>>>> well, so I'm self replying and taking the opportunity to illustrate with 
>>>>> an
>>>>> example.
>>>>>
>>>>> Basically I want to conceptually do this:
>>>>>
>>>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => 
>>>>> (i, 1)).toDF("k", "v")
>>>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i 
>>>>> => (i, 1)).toDF("k", "v")
>>>>>
>>>>> bigDf.cache()
>>>>>
>>>>> bigDf.registerTempTable("big")
>>>>> deltaDf.registerTempTable("delta")
>>>>>
>>>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 
>>>>> 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>>>
>>>>> newBigDf.cache()
>>>>> bigDf.unpersist()
>>>>>
>>>>>
>>>>> This is essentially an update of keys "1" and "50000" only, in a
>>>>> dataset of 1 million keys.
>>>>>
>>>>> This can be achieved efficiently if the join would preserve the cached
>>>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>>>> blocks corresponding to the matching join keys.
>>>>>
>>>>> Statistics can determine which blocks actually need mutating. Note
>>>>> also that shuffling is not required assuming both dataframes are
>>>>> pre-partitioned by the same key K.
>>>>>
>>>>> In SQL this could actually be expressed as an UPDATE statement or for
>>>>> a more generalized use as a MERGE UPDATE:
>>>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>>>
>>>>> While this may seem like a very special case optimization, it would
>>>>> effectively implement UPDATE support for cached DataFrames, for both
>>>>> optimal and non-optimal usage.
>>>>>
>>>>> I appreciate there's quite a lot here, so thank you for taking the
>>>>> time to consider it.
>>>>>
>>>>> Cristian
>>>>>
>>>>>
>>>>>
>>>>> On 12 November 2015 at 15:49, Cristian O <
>>>>> cristian.b.op...@googlemail.com> wrote:
>>>>>
>>>>>> Hi Reynold,
>>>>>>
>>>>>> Thanks for your reply.
>>>>>>
>>>>>> Parquet may very well be used as the underlying implementation, but
>>>>>> this is more than about a particular storage representation.
>>>>>>
>>>>>> There are a few things here that are inter-related and open different
>>>>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>>>>
>>>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>>>>> parquet, just using that as a checkpoint would currently require 
>>>>>> explicitly
>>>>>> reading it back. A proper checkpoint implementation would just save
>>>>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>>>>> continue using the same DF, now backed by the checkpoint.
>>>>>>
>>>>>> It's important to prune the logical plan to avoid all kinds of issues
>>>>>> that may arise from unbounded expansion with iterative use-cases, like 
>>>>>> this
>>>>>> one I encountered recently:
>>>>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>>>>
>>>>>> But really what I'm after here is:
>>>>>>
>>>>>> 2. Efficient updating of cached DataFrames - The main use case here
>>>>>> is keeping a relatively large dataset cached and updating it iteratively
>>>>>> from streaming. For example one would like to perform ad-hoc queries on 
>>>>>> an
>>>>>> incrementally updated, cached DataFrame. I expect this is already 
>>>>>> becoming
>>>>>> an increasingly common use case. Note that the dataset may require 
>>>>>> merging
>>>>>> (like adding) or overrriding values by key, so simply appending is not
>>>>>> sufficient.
>>>>>>
>>>>>> This is very similar in concept with updateStateByKey for regular
>>>>>> RDDs, i.e. an efficient copy-on-write mechanism, albeit perhaps at
>>>>>> CachedBatch level  (the row blocks for the columnar representation).
>>>>>>
>>>>>> This can be currently simulated with UNION or (OUTER) JOINs however
>>>>>> is very inefficient as it requires copying and recaching the entire
>>>>>> dataset, and unpersisting the original one. There are also the
>>>>>> aforementioned problems with unbounded logical plans (physical plans are
>>>>>> fine)
>>>>>>
>>>>>> These two together, checkpointing and updating cached DataFrames,
>>>>>> would give fault-tolerant efficient updating of DataFrames, meaning
>>>>>> streaming apps can take advantage of the compact columnar representation
>>>>>> and Tungsten optimisations.
>>>>>>
>>>>>> I'm not quite sure if something like this can be achieved by other
>>>>>> means or has been investigated before, hence why I'm looking for feedback
>>>>>> here.
>>>>>>
>>>>>> While one could use external data stores, they would have the added
>>>>>> IO penalty, plus most of what's available at the moment is either HDFS
>>>>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>>>>> space overhead over columnar formats.
>>>>>>
>>>>>> Thanks,
>>>>>> Cristian
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the email. Can you explain what the difference is between
>>>>>>> this and existing formats such as Parquet/ORC?
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>>>>> cristian.b.op...@googlemail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I was wondering if there's any planned support for local disk
>>>>>>>> columnar storage.
>>>>>>>>
>>>>>>>> This could be an extension of the in-memory columnar store, or
>>>>>>>> possibly something similar to the recently added local checkpointing 
>>>>>>>> for
>>>>>>>> RDDs
>>>>>>>>
>>>>>>>> This could also have the added benefit of enabling iterative usage
>>>>>>>> for DataFrames by pruning the query plan through local checkpoints.
>>>>>>>>
>>>>>>>> A further enhancement would be to add update support to the
>>>>>>>> columnar format (in the immutable copy-on-write sense of course), by
>>>>>>>> maintaining references to unchanged row blocks and only copying and
>>>>>>>> mutating the ones that have changed.
>>>>>>>>
>>>>>>>> A use case here is streaming and merging updates in a large dataset
>>>>>>>> that can be efficiently stored internally in a columnar format, rather 
>>>>>>>> than
>>>>>>>> accessing a more inefficient external  data store like HDFS or 
>>>>>>>> Cassandra.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cristian
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to