Raised this for checkpointing, hopefully it gets some priority as it's very useful and relatively straightforward to implement ?
https://issues.apache.org/jira/browse/SPARK-11879 On 18 November 2015 at 16:31, Cristian O <cristian.b.op...@googlemail.com> wrote: > Hi, > > While these OSS efforts are interesting, they're for now quite unproven. > Personally would be much more interested in seeing Spark incrementally > moving towards supporting updating DataFrames on various storage > substrates, and first of all locally, perhaps as an extension of cached > DataFrames. > > However before we get full blown update support, I would suggest two > enhancements that are fairly straightforward with the current design. If > they make sense please let me know and I'll add them as Jiras: > > 1. Checkpoint support for DataFrames - as mentioned this can be as simple > as saving to a parquet file or some other format, but would not require > re-reading the file to alter the lineage, and would also prune the logical > plan. Alternatively checkpointing a cached DataFrame can delegate to > checkpointing the underlying RDD but again needs to prune the logical plan. > > 2. Efficient transformation of cached DataFrames to cached DataFrames - an > efficient copy-on-write mechanism can be used to avoid unpacking > CachedBatches (row groups) into InternalRows when building a cached > DataFrame out of a source cached DataFrame through transformations (like an > outer join) that only affect a small subset of rows. Statistics and > partitioning information can be used to determine which row groups are > affected and which can be copied *by reference* unchanged. This would > effectively allow performing immutable updates of cached DataFrames in > scenarios like Streaming or other iterative use cases like ML. > > Thanks, > Cristian > > > > On 16 November 2015 at 08:30, Mark Hamstra <m...@clearstorydata.com> > wrote: > >> FiloDB is also closely reated. https://github.com/tuplejump/FiloDB >> >> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath < >> nick.pentre...@gmail.com> wrote: >> >>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop >>> input/output format support: >>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java >>> >>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <r...@databricks.com> >>> wrote: >>> >>>> This (updates) is something we are going to think about in the next >>>> release or two. >>>> >>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < >>>> cristian.b.op...@googlemail.com> wrote: >>>> >>>>> Sorry, apparently only replied to Reynold, meant to copy the list as >>>>> well, so I'm self replying and taking the opportunity to illustrate with >>>>> an >>>>> example. >>>>> >>>>> Basically I want to conceptually do this: >>>>> >>>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => >>>>> (i, 1)).toDF("k", "v") >>>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i >>>>> => (i, 1)).toDF("k", "v") >>>>> >>>>> bigDf.cache() >>>>> >>>>> bigDf.registerTempTable("big") >>>>> deltaDf.registerTempTable("delta") >>>>> >>>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, >>>>> 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k") >>>>> >>>>> newBigDf.cache() >>>>> bigDf.unpersist() >>>>> >>>>> >>>>> This is essentially an update of keys "1" and "50000" only, in a >>>>> dataset of 1 million keys. >>>>> >>>>> This can be achieved efficiently if the join would preserve the cached >>>>> blocks that have been unaffected, and only copy and mutate the 2 affected >>>>> blocks corresponding to the matching join keys. >>>>> >>>>> Statistics can determine which blocks actually need mutating. Note >>>>> also that shuffling is not required assuming both dataframes are >>>>> pre-partitioned by the same key K. >>>>> >>>>> In SQL this could actually be expressed as an UPDATE statement or for >>>>> a more generalized use as a MERGE UPDATE: >>>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx >>>>> >>>>> While this may seem like a very special case optimization, it would >>>>> effectively implement UPDATE support for cached DataFrames, for both >>>>> optimal and non-optimal usage. >>>>> >>>>> I appreciate there's quite a lot here, so thank you for taking the >>>>> time to consider it. >>>>> >>>>> Cristian >>>>> >>>>> >>>>> >>>>> On 12 November 2015 at 15:49, Cristian O < >>>>> cristian.b.op...@googlemail.com> wrote: >>>>> >>>>>> Hi Reynold, >>>>>> >>>>>> Thanks for your reply. >>>>>> >>>>>> Parquet may very well be used as the underlying implementation, but >>>>>> this is more than about a particular storage representation. >>>>>> >>>>>> There are a few things here that are inter-related and open different >>>>>> possibilities, so it's hard to structure, but I'll give it a try: >>>>>> >>>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as >>>>>> parquet, just using that as a checkpoint would currently require >>>>>> explicitly >>>>>> reading it back. A proper checkpoint implementation would just save >>>>>> (perhaps asynchronously) and prune the logical plan while allowing to >>>>>> continue using the same DF, now backed by the checkpoint. >>>>>> >>>>>> It's important to prune the logical plan to avoid all kinds of issues >>>>>> that may arise from unbounded expansion with iterative use-cases, like >>>>>> this >>>>>> one I encountered recently: >>>>>> https://issues.apache.org/jira/browse/SPARK-11596 >>>>>> >>>>>> But really what I'm after here is: >>>>>> >>>>>> 2. Efficient updating of cached DataFrames - The main use case here >>>>>> is keeping a relatively large dataset cached and updating it iteratively >>>>>> from streaming. For example one would like to perform ad-hoc queries on >>>>>> an >>>>>> incrementally updated, cached DataFrame. I expect this is already >>>>>> becoming >>>>>> an increasingly common use case. Note that the dataset may require >>>>>> merging >>>>>> (like adding) or overrriding values by key, so simply appending is not >>>>>> sufficient. >>>>>> >>>>>> This is very similar in concept with updateStateByKey for regular >>>>>> RDDs, i.e. an efficient copy-on-write mechanism, albeit perhaps at >>>>>> CachedBatch level (the row blocks for the columnar representation). >>>>>> >>>>>> This can be currently simulated with UNION or (OUTER) JOINs however >>>>>> is very inefficient as it requires copying and recaching the entire >>>>>> dataset, and unpersisting the original one. There are also the >>>>>> aforementioned problems with unbounded logical plans (physical plans are >>>>>> fine) >>>>>> >>>>>> These two together, checkpointing and updating cached DataFrames, >>>>>> would give fault-tolerant efficient updating of DataFrames, meaning >>>>>> streaming apps can take advantage of the compact columnar representation >>>>>> and Tungsten optimisations. >>>>>> >>>>>> I'm not quite sure if something like this can be achieved by other >>>>>> means or has been investigated before, hence why I'm looking for feedback >>>>>> here. >>>>>> >>>>>> While one could use external data stores, they would have the added >>>>>> IO penalty, plus most of what's available at the moment is either HDFS >>>>>> (extremely inefficient for updates) or key-value stores that have 5-10x >>>>>> space overhead over columnar formats. >>>>>> >>>>>> Thanks, >>>>>> Cristian >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the email. Can you explain what the difference is between >>>>>>> this and existing formats such as Parquet/ORC? >>>>>>> >>>>>>> >>>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < >>>>>>> cristian.b.op...@googlemail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I was wondering if there's any planned support for local disk >>>>>>>> columnar storage. >>>>>>>> >>>>>>>> This could be an extension of the in-memory columnar store, or >>>>>>>> possibly something similar to the recently added local checkpointing >>>>>>>> for >>>>>>>> RDDs >>>>>>>> >>>>>>>> This could also have the added benefit of enabling iterative usage >>>>>>>> for DataFrames by pruning the query plan through local checkpoints. >>>>>>>> >>>>>>>> A further enhancement would be to add update support to the >>>>>>>> columnar format (in the immutable copy-on-write sense of course), by >>>>>>>> maintaining references to unchanged row blocks and only copying and >>>>>>>> mutating the ones that have changed. >>>>>>>> >>>>>>>> A use case here is streaming and merging updates in a large dataset >>>>>>>> that can be efficiently stored internally in a columnar format, rather >>>>>>>> than >>>>>>>> accessing a more inefficient external data store like HDFS or >>>>>>>> Cassandra. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cristian >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >