Cloudera's Kudu also looks interesting here (getkudu.io) - Hadoop input/output format support: https://github.com/cloudera/kudu/blob/master/java/kudu-mapreduce/src/main/java/org/kududb/mapreduce/KuduTableInputFormat.java
On Mon, Nov 16, 2015 at 7:52 AM, Reynold Xin <r...@databricks.com> wrote: > This (updates) is something we are going to think about in the next > release or two. > > On Thu, Nov 12, 2015 at 8:57 AM, Cristian O < > cristian.b.op...@googlemail.com> wrote: > >> Sorry, apparently only replied to Reynold, meant to copy the list as >> well, so I'm self replying and taking the opportunity to illustrate with an >> example. >> >> Basically I want to conceptually do this: >> >> val bigDf = sqlContext.sparkContext.parallelize((1 to 1000000)).map(i => (i, >> 1)).toDF("k", "v") >> val deltaDf = sqlContext.sparkContext.parallelize(Array(1, 50000)).map(i => >> (i, 1)).toDF("k", "v") >> >> bigDf.cache() >> >> bigDf.registerTempTable("big") >> deltaDf.registerTempTable("delta") >> >> val newBigDf = sqlContext.sql("SELECT big.k, big.v + IF(delta.v is null, 0, >> delta.v) FROM big LEFT JOIN delta on big.k = delta.k") >> >> newBigDf.cache() >> bigDf.unpersist() >> >> >> This is essentially an update of keys "1" and "50000" only, in a dataset >> of 1 million keys. >> >> This can be achieved efficiently if the join would preserve the cached >> blocks that have been unaffected, and only copy and mutate the 2 affected >> blocks corresponding to the matching join keys. >> >> Statistics can determine which blocks actually need mutating. Note also >> that shuffling is not required assuming both dataframes are pre-partitioned >> by the same key K. >> >> In SQL this could actually be expressed as an UPDATE statement or for a >> more generalized use as a MERGE UPDATE: >> https://technet.microsoft.com/en-us/library/bb522522(v=sql.105).aspx >> >> While this may seem like a very special case optimization, it would >> effectively implement UPDATE support for cached DataFrames, for both >> optimal and non-optimal usage. >> >> I appreciate there's quite a lot here, so thank you for taking the time >> to consider it. >> >> Cristian >> >> >> >> On 12 November 2015 at 15:49, Cristian O <cristian.b.op...@googlemail.com >> > wrote: >> >>> Hi Reynold, >>> >>> Thanks for your reply. >>> >>> Parquet may very well be used as the underlying implementation, but this >>> is more than about a particular storage representation. >>> >>> There are a few things here that are inter-related and open different >>> possibilities, so it's hard to structure, but I'll give it a try: >>> >>> 1. Checkpointing DataFrames - while a DF can be saved locally as >>> parquet, just using that as a checkpoint would currently require explicitly >>> reading it back. A proper checkpoint implementation would just save >>> (perhaps asynchronously) and prune the logical plan while allowing to >>> continue using the same DF, now backed by the checkpoint. >>> >>> It's important to prune the logical plan to avoid all kinds of issues >>> that may arise from unbounded expansion with iterative use-cases, like this >>> one I encountered recently: >>> https://issues.apache.org/jira/browse/SPARK-11596 >>> >>> But really what I'm after here is: >>> >>> 2. Efficient updating of cached DataFrames - The main use case here is >>> keeping a relatively large dataset cached and updating it iteratively from >>> streaming. For example one would like to perform ad-hoc queries on an >>> incrementally updated, cached DataFrame. I expect this is already becoming >>> an increasingly common use case. Note that the dataset may require merging >>> (like adding) or overrriding values by key, so simply appending is not >>> sufficient. >>> >>> This is very similar in concept with updateStateByKey for regular RDDs, >>> i.e. an efficient copy-on-write mechanism, albeit perhaps at CachedBatch >>> level (the row blocks for the columnar representation). >>> >>> This can be currently simulated with UNION or (OUTER) JOINs however is >>> very inefficient as it requires copying and recaching the entire dataset, >>> and unpersisting the original one. There are also the aforementioned >>> problems with unbounded logical plans (physical plans are fine) >>> >>> These two together, checkpointing and updating cached DataFrames, would >>> give fault-tolerant efficient updating of DataFrames, meaning streaming >>> apps can take advantage of the compact columnar representation and Tungsten >>> optimisations. >>> >>> I'm not quite sure if something like this can be achieved by other means >>> or has been investigated before, hence why I'm looking for feedback here. >>> >>> While one could use external data stores, they would have the added IO >>> penalty, plus most of what's available at the moment is either HDFS >>> (extremely inefficient for updates) or key-value stores that have 5-10x >>> space overhead over columnar formats. >>> >>> Thanks, >>> Cristian >>> >>> >>> >>> >>> >>> >>> On 12 November 2015 at 03:31, Reynold Xin <r...@databricks.com> wrote: >>> >>>> Thanks for the email. Can you explain what the difference is between >>>> this and existing formats such as Parquet/ORC? >>>> >>>> >>>> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O < >>>> cristian.b.op...@googlemail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I was wondering if there's any planned support for local disk columnar >>>>> storage. >>>>> >>>>> This could be an extension of the in-memory columnar store, or >>>>> possibly something similar to the recently added local checkpointing for >>>>> RDDs >>>>> >>>>> This could also have the added benefit of enabling iterative usage for >>>>> DataFrames by pruning the query plan through local checkpoints. >>>>> >>>>> A further enhancement would be to add update support to the columnar >>>>> format (in the immutable copy-on-write sense of course), by maintaining >>>>> references to unchanged row blocks and only copying and mutating the ones >>>>> that have changed. >>>>> >>>>> A use case here is streaming and merging updates in a large dataset >>>>> that can be efficiently stored internally in a columnar format, rather >>>>> than >>>>> accessing a more inefficient external data store like HDFS or Cassandra. >>>>> >>>>> Thanks, >>>>> Cristian >>>>> >>>> >>>> >>> >> >