This (updates) is something we are going to think about in the next release or two.
On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <cristian.b.op...@googlemail.com > wrote: > Sorry, apparently only replied to Reynold, meant to copy the list as well, > so I'm self replying and taking the opportunity to illustrate with an > example. > > Basically I want to conceptually do this: > > val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, > 1)).toDF("k", "v") > val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => > (i, 1)).toDF("k", "v") > > bigDf.cache() > > bigDf.registerTempTable("big") > deltaDf.registerTempTable("delta") > > val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, > delta.v) FROM big LEFT JOIN delta on big.k = delta.k") > > newBigDf.cache() > bigDf.unpersist() > > > This is essentially an update of keys "1" and "50000" only, in a dataset > of 1 million keys. > > This can be achieved efficiently if the join would preserve the cached > blocks that have been unaffected, and only copy and mutate the 2 affected > blocks corresponding to the matching join keys. > > Statistics can determine which blocks actually need mutating. Note also > that shuffling is not required assuming both dataframes are pre-partitioned > by the same key K. > > In SQL this could actually be expressed as an UPDATE statement or for a > more generalized use as a MERGE UPDATE: > https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx > > While this may seem like a very special case optimization, it would > effectively implement UPDATE support for cached DataFrames, for both > optimal and non-optimal usage. > > I appreciate there's quite a lot here, so thank you for taking the time to > consider it. > > Cristian > > > > On 12 November 2015 at 15:49, Cristian O <cristian.b.op...@googlemail.com> > wrote: > >> Hi Reynold, >> >> Thanks for your reply. >> >> Parquet may very well be used as the underlying implementation, but this >> is more than about a particular storage representation. >> >> There are a few things here that are inter-related and open different >> possibilities, so it's hard to structure, but I'll give it a try: >> >> 1. Checkpointing DataFrames - while a DF can be saved locally as parquet, >> just using that as a checkpoint would currently require explicitly reading >> it back. A proper checkpoint implementation would just save (perhaps >> asynchronously) and prune the logical plan while allowing to continue using >> the same DF, now backed by the checkpoint. >> >> It's important to prune the logical plan to avoid all kinds of issues >> that may arise from unbounded expansion with iterative use-cases, like this >> one I encountered recently: >> https://issues.apache.org/jira/browse/SPARK-11596 >> >> But really what I'm after here is: >> >> 2. Efficient updating of cached DataFrames - The main use case here is >> keeping a relatively large dataset cached and updating it iteratively from >> streaming. For example one would like to perform ad-hoc queries on an >> incrementally updated, cached DataFrame. I expect this is already becoming >> an increasingly common use case. Note that the dataset may require merging >> (like adding) or overrriding values by key, so simply appending is not >> sufficient. >> >> This is very similar in concept with updateStateByKey for regular RDDs, >> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch >> level (the row blocks for the columnar representation). >> >> This can be currently simulated with UNION or (OUTER) JOINs however is >> very inefficient as it requires copying and recaching the entire dataset, >> and unpersisting the original one. There are also the aforementioned >> problems with unbounded logical plans (physical plans are fine) >> >> These two together, checkpointing and updating cached DataFrames, would >> give fault-tolerant efficient updating of DataFrames, meaning streaming >> apps can take advantage of the compact columnar representation and Tungsten >> optimisations. >> >> I'm not quite sure if something like this can be achieved by other means >> or has been investigated before, hence why I'm looking for feedback here. >> >> While one could use external data stores, they would have the added IO >> penalty, plus most of what's available at the moment is either HDFS >> (extremely inefficient for updates) or key-value stores that have 5-10x >> space overhead over columnar formats. >> >> Thanks, >> Cristian >> >> >> >> >> >> >> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote: >> >>> Thanks for the email. Can you explain what the difference is between >>> this and existing formats such as Parquet/ORC? >>> >>> >>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < >>> cristian.b.op...@googlemail.com> wrote: >>> >>>> Hi, >>>> >>>> I was wondering if there's any planned support for local disk columnar >>>> storage. >>>> >>>> This could be an extension of the in-memory columnar store, or possibly >>>> something similar to the recently added local checkpointing for RDDs >>>> >>>> This could also have the added benefit of enabling iterative usage for >>>> DataFrames by pruning the query plan through local checkpoints. >>>> >>>> A further enhancement would be to add update support to the columnar >>>> format (in the immutable copy-on-write sense of course), by maintaining >>>> references to unchanged row blocks and only copying and mutating the ones >>>> that have changed. >>>> >>>> A use case here is streaming and merging updates in a large dataset >>>> that can be efficiently stored internally in a columnar format, rather than >>>> accessing a more inefficient external data store like HDFS or Cassandra. >>>> >>>> Thanks, >>>> Cristian >>>> >>> >>> >> >