Hi,

While these OSS efforts are interesting, they're for now quite unproven.
Personally would be much more interested in seeing Spark incrementally
moving towards supporting updating DataFrames on various storage
substrates, and first of all locally, perhaps as an extension of cached
DataFrames.

However before we get full blown update support, I would suggest two
enhancements that are fairly straightforward with the current design. If
they make sense please let me know and I'll add them as Jiras:

1. Checkpoint support for DataFrames - as mentioned this can be as simple
as saving to a parquet file or some other format, but would not require
re-reading the file to alter the lineage, and would also prune the logical
plan. Alternatively checkpointing a cached DataFrame can delegate to
checkpointing the underlying RDD but again needs to prune the logical plan.

2. Efficient transformation of cached DataFrames to cached DataFrames - an
efficient copy-on-write mechanism can be used to avoid unpacking
CachedBatches (row groups) into InternalRows when building a cached
DataFrame out of a source cached DataFrame through transformations (like an
outer join) that only affect a small subset of rows. Statistics and
partitioning information can be used to determine which row groups are
affected and which can be copied *by reference* unchanged. This would
effectively allow performing immutable updates of cached DataFrames in
scenarios like Streaming or other iterative use cases like ML.

Thanks,
Cristian



On 16 November 2015 at 08:30, Mark Hamstra <m...@clearstorydata.com> wrote:

> FiloDB is also closely reated.  https://github.com/tuplejump/FiloDB
>
> On Mon, Nov 16, 2015 at 12:24 AM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
>> Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop
>> input/output format support:
>> https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
>>
>> On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>>> This (updates) is something we are going to think about in the next
>>> release or two.
>>>
>>> On Thu, Nov 12, 2015 at 8:57 AM, Cristian O <
>>> cristian.b.op...@googlemail.com> wrote:
>>>
>>>> Sorry, apparently only replied to Reynold, meant to copy the list as
>>>> well, so I'm self replying and taking the opportunity to illustrate with an
>>>> example.
>>>>
>>>> Basically I want to conceptually do this:
>>>>
>>>> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => 
>>>> (i, 1)).toDF("k", "v")
>>>> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i 
>>>> => (i, 1)).toDF("k", "v")
>>>>
>>>> bigDf.cache()
>>>>
>>>> bigDf.registerTempTable("big")
>>>> deltaDf.registerTempTable("delta")
>>>>
>>>> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 
>>>> 0, delta.v) FROM big LEFT JOIN delta on big.k = delta.k")
>>>>
>>>> newBigDf.cache()
>>>> bigDf.unpersist()
>>>>
>>>>
>>>> This is essentially an update of keys "1" and "50000" only, in a
>>>> dataset of 1 million keys.
>>>>
>>>> This can be achieved efficiently if the join would preserve the cached
>>>> blocks that have been unaffected, and only copy and mutate the 2 affected
>>>> blocks corresponding to the matching join keys.
>>>>
>>>> Statistics can determine which blocks actually need mutating. Note also
>>>> that shuffling is not required assuming both dataframes are pre-partitioned
>>>> by the same key K.
>>>>
>>>> In SQL this could actually be expressed as an UPDATE statement or for a
>>>> more generalized use as a MERGE UPDATE:
>>>> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx
>>>>
>>>> While this may seem like a very special case optimization, it would
>>>> effectively implement UPDATE support for cached DataFrames, for both
>>>> optimal and non-optimal usage.
>>>>
>>>> I appreciate there's quite a lot here, so thank you for taking the time
>>>> to consider it.
>>>>
>>>> Cristian
>>>>
>>>>
>>>>
>>>> On 12 November 2015 at 15:49, Cristian O <
>>>> cristian.b.op...@googlemail.com> wrote:
>>>>
>>>>> Hi Reynold,
>>>>>
>>>>> Thanks for your reply.
>>>>>
>>>>> Parquet may very well be used as the underlying implementation, but
>>>>> this is more than about a particular storage representation.
>>>>>
>>>>> There are a few things here that are inter-related and open different
>>>>> possibilities, so it's hard to structure, but I'll give it a try:
>>>>>
>>>>> 1. Checkpointing DataFrames - while a DF can be saved locally as
>>>>> parquet, just using that as a checkpoint would currently require 
>>>>> explicitly
>>>>> reading it back. A proper checkpoint implementation would just save
>>>>> (perhaps asynchronously) and prune the logical plan while allowing to
>>>>> continue using the same DF, now backed by the checkpoint.
>>>>>
>>>>> It's important to prune the logical plan to avoid all kinds of issues
>>>>> that may arise from unbounded expansion with iterative use-cases, like 
>>>>> this
>>>>> one I encountered recently:
>>>>> https://issues.apache.org/jira/browse/SPARK-11596
>>>>>
>>>>> But really what I'm after here is:
>>>>>
>>>>> 2. Efficient updating of cached DataFrames - The main use case here is
>>>>> keeping a relatively large dataset cached and updating it iteratively from
>>>>> streaming. For example one would like to perform ad-hoc queries on an
>>>>> incrementally updated, cached DataFrame. I expect this is already becoming
>>>>> an increasingly common use case. Note that the dataset may require merging
>>>>> (like adding) or overrriding values by key, so simply appending is not
>>>>> sufficient.
>>>>>
>>>>> This is very similar in concept with updateStateByKey for regular
>>>>> RDDs, i.e. an efficient copy-on-write mechanism, albeit perhaps at
>>>>> CachedBatch level  (the row blocks for the columnar representation).
>>>>>
>>>>> This can be currently simulated with UNION or (OUTER) JOINs however is
>>>>> very inefficient as it requires copying and recaching the entire dataset,
>>>>> and unpersisting the original one. There are also the aforementioned
>>>>> problems with unbounded logical plans (physical plans are fine)
>>>>>
>>>>> These two together, checkpointing and updating cached DataFrames,
>>>>> would give fault-tolerant efficient updating of DataFrames, meaning
>>>>> streaming apps can take advantage of the compact columnar representation
>>>>> and Tungsten optimisations.
>>>>>
>>>>> I'm not quite sure if something like this can be achieved by other
>>>>> means or has been investigated before, hence why I'm looking for feedback
>>>>> here.
>>>>>
>>>>> While one could use external data stores, they would have the added IO
>>>>> penalty, plus most of what's available at the moment is either HDFS
>>>>> (extremely inefficient for updates) or key-value stores that have 5-10x
>>>>> space overhead over columnar formats.
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote:
>>>>>
>>>>>> Thanks for the email. Can you explain what the difference is between
>>>>>> this and existing formats such as Parquet/ORC?
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
>>>>>> cristian.b.op...@googlemail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I was wondering if there's any planned support for local disk
>>>>>>> columnar storage.
>>>>>>>
>>>>>>> This could be an extension of the in-memory columnar store, or
>>>>>>> possibly something similar to the recently added local checkpointing for
>>>>>>> RDDs
>>>>>>>
>>>>>>> This could also have the added benefit of enabling iterative usage
>>>>>>> for DataFrames by pruning the query plan through local checkpoints.
>>>>>>>
>>>>>>> A further enhancement would be to add update support to the columnar
>>>>>>> format (in the immutable copy-on-write sense of course), by maintaining
>>>>>>> references to unchanged row blocks and only copying and mutating the 
>>>>>>> ones
>>>>>>> that have changed.
>>>>>>>
>>>>>>> A use case here is streaming and merging updates in a large dataset
>>>>>>> that can be efficiently stored internally in a columnar format, rather 
>>>>>>> than
>>>>>>> accessing a more inefficient external  data store like HDFS or 
>>>>>>> Cassandra.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cristian
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to