I just realized we had an earlier SPIP on a similar topic: 
https://issues.apache.org/jira/browse/SPARK-24579
Perhaps we should tie the two together. IIUC, you'd want to expose the existing 
ColumnBatch API, but also provide utilities to directly convert from/to Arrow.

On Thu, Apr 11, 2019 at 7:13 AM, Bobby Evans < bo...@apache.org > wrote:

> 
> The SPIP has been up for almost 6 days now with really no discussion on
> it.  I am hopeful that means it's okay and we are good to call a vote on
> it, but I want to give everyone one last chance to take a look and
> comment.  If there are no comments by tomorrow I this we will start a vote
> for this.
> 
> 
> Thanks,
> 
> 
> Bobby
> 
> On Fri, Apr 5, 2019 at 2:24 PM Bobby Evans < bobby@ apache. org (
> bo...@apache.org ) > wrote:
> 
> 
>> I just filed SPARK-27396 as the SPIP for this proposal.  Please use that
>> JIRA for further discussions.
>> 
>> 
>> Thanks for all of the feedback,
>> 
>> 
>> Bobby
>> 
>> On Wed, Apr 3, 2019 at 7:15 PM Bobby Evans < bobby@ apache. org (
>> bo...@apache.org ) > wrote:
>> 
>> 
>>> I am still working on the SPIP and should get it up in the next few days. 
>>> I have the basic text more or less ready, but I want to get a high-level
>>> API concept ready too just to have something more concrete.  I have not
>>> really done much with contributing new features to spark so I am not sure
>>> where a design document really fits in here because from http:/ / spark. 
>>> apache.
>>> org/ improvement-proposals. html (
>>> http://spark.apache.org/improvement-proposals.html ) and http:/ / spark. 
>>> apache.
>>> org/ contributing. html ( http://spark.apache.org/contributing.html ) it
>>> does not mention a design anywhere.  I am happy to put one up, but I was
>>> hoping the API concept would cover most of that.
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> Bobby
>>> 
>>> On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu < liurenjie2008@ gmail. com (
>>> liurenjie2...@gmail.com ) > wrote:
>>> 
>>> 
>>>> Hi, Bobby:
>>>> Do you have design doc? I'm also interested in this topic and want to help
>>>> contribute.
>>>> 
>>>> On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans < bobby@ apache. org (
>>>> bo...@apache.org ) > wrote:
>>>> 
>>>> 
>>>>> Thanks to everyone for the feedback.
>>>>> 
>>>>> 
>>>>> Overall the feedback has been really positive for exposing columnar as a
>>>>> processing option to users.  I'll write up a SPIP on the proposed changes
>>>>> to support columnar processing (not necessarily implement it) and then
>>>>> ping the list again for more feedback and discussion.
>>>>> 
>>>>> 
>>>>> Thanks again,
>>>>> 
>>>>> 
>>>>> Bobby
>>>>> 
>>>>> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin < rxin@ databricks. com (
>>>>> r...@databricks.com ) > wrote:
>>>>> 
>>>>> 
>>>>>> I just realized I didn't make it very clear my stance here ... here's
>>>>>> another try:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> I think it's a no brainer to have a good columnar UDF interface. This
>>>>>> would facilitate a lot of high performance applications, e.g. GPU-based
>>>>>> accelerations for machine learning algorithms.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On rewriting the entire internals of Spark SQL to leverage columnar
>>>>>> processing, I don't see enough evidence to suggest that's a good idea 
>>>>>> yet.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans < bobby@ apache. org (
>>>>>> bo...@apache.org ) > wrote:
>>>>>> 
>>>>>>> Kazuaki Ishizaki,
>>>>>>> 
>>>>>>> 
>>>>>>> Yes, ColumnarBatchScan does provide a framework for doing code 
>>>>>>> generation
>>>>>>> for the processing of columnar data.  I have to admit that I don't have 
>>>>>>> a
>>>>>>> deep understanding of the code generation piece, so if I get something
>>>>>>> wrong please correct me.  From what I had seen only input formats
>>>>>>> currently inherent from ColumnarBatchScan, and from comments in the 
>>>>>>> trait
>>>>>>> 
>>>>>>> 
>>>>>>>   /**
>>>>>>>    * Generate [[ColumnVector]] expressions for our parent to consume as
>>>>>>> rows.
>>>>>>>    * This is called once per [[ColumnarBatch]].
>>>>>>>    */
>>>>>>> https:/ / github. com/ apache/ spark/ blob/ 
>>>>>>> 956b52b1670985a67e49b938ac1499ae65c79f6e/
>>>>>>> sql/ core/ src/ main/ scala/ org/ apache/ spark/ sql/ execution/ 
>>>>>>> ColumnarBatchScan.
>>>>>>> scala#L42-L43 (
>>>>>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
>>>>>>> )
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> It appears that ColumnarBatchScan is really only intended to pull out 
>>>>>>> the
>>>>>>> data from the batch, and not to process that data in a columnar 
>>>>>>> fashion. 
>>>>>>> The Loading stage that you mentioned.
>>>>>>> 
>>>>>>> 
>>>>>>> > The SIMDzation or GPUization capability depends on a compiler that
>>>>>>> translates native code from the code generated by the whole-stage 
>>>>>>> codegen.
>>>>>>> 
>>>>>>> To be able to support vectorized processing Hive stayed with pure java 
>>>>>>> and
>>>>>>> let the JVM detect and do the SIMDzation of the code.  To make that 
>>>>>>> happen
>>>>>>> they created loops to go through each element in a column and remove all
>>>>>>> conditionals from the body of the loops.  To the best of my knowledge 
>>>>>>> that
>>>>>>> would still require a separate code path like I am proposing to make the
>>>>>>> different processing phases generate code that the JVM can compile down 
>>>>>>> to
>>>>>>> SIMD instructions.  The generated code is full of null checks for each
>>>>>>> element which would prevent the operations we want.  Also, the
>>>>>>> intermediate results are often stored in UnsafeRow instances.  This is
>>>>>>> really fast for row-based processing, but the complexity of how they 
>>>>>>> work
>>>>>>> I believe would prevent the JVM from being able to vectorize the
>>>>>>> processing.  If you have a better way to take java code and vectorize it
>>>>>>> we should put it into OpenJDK instead of spark so everyone can benefit
>>>>>>> from it.
>>>>>>> 
>>>>>>> 
>>>>>>> Trying to compile directly from generated java code to something a GPU 
>>>>>>> can
>>>>>>> process is something we are tackling but we decided to go a different
>>>>>>> route from what you proposed.  From talking with several compiler 
>>>>>>> experts
>>>>>>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
>>>>>>> attempted in the past to extend the JVM to run at least partially on 
>>>>>>> GPUs,
>>>>>>> but it was really difficult to get right, especially with how java does
>>>>>>> memory management and memory layout.
>>>>>>> 
>>>>>>> 
>>>>>>> To avoid that complexity we decided to split the JITing up into two
>>>>>>> separate pieces.  I didn't mention any of this before because this
>>>>>>> discussion was intended to just be around the memory layout support, and
>>>>>>> not GPU processing.  The first part would be to take the Catalyst AST 
>>>>>>> and
>>>>>>> produce CUDA code directly from it.  If properly done we should be able 
>>>>>>> to
>>>>>>> do the selection and projection phases within a single kernel.  The
>>>>>>> biggest issue comes with UDFs as they cannot easily be vectorized for 
>>>>>>> the
>>>>>>> CPU or GPU.  So to deal with that we have a prototype written by the
>>>>>>> compiler team that is trying to tackle SPARK-14083 which can translate
>>>>>>> basic UDFs into catalyst expressions.  If the UDF is too complicated or
>>>>>>> covers operations not yet supported it will fall back to the original 
>>>>>>> UDF
>>>>>>> processing.  I don't know how close the team is to submit a SPIP or a
>>>>>>> patch for it, but I do know that they have some very basic operations
>>>>>>> working.  The big issue is that it requires java 11+ so it can use
>>>>>>> standard APIs to get the byte code of scala UDFs.  
>>>>>>> 
>>>>>>> 
>>>>>>> We split it this way because we thought it would be simplest to 
>>>>>>> implement,
>>>>>>> and because it would provide a benefit to more than just GPU accelerated
>>>>>>> queries.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> 
>>>>>>> Bobby
>>>>>>> 
>>>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki < ISHIZAKI@ jp. ibm. 
>>>>>>> com
>>>>>>> ( ishiz...@jp.ibm.com ) > wrote:
>>>>>>> 
>>>>>>> 
>>>>>>>> Looks interesting discussion.
>>>>>>>> Let me describe the current structure and remaining issues. This is
>>>>>>>> orthogonal to cost-benefit trade-off discussion.
>>>>>>>> 
>>>>>>>> The code generation basically consists of three parts.
>>>>>>>> 1. Loading
>>>>>>>> 2. Selection (map, filter, ...)
>>>>>>>> 3. Projection
>>>>>>>> 
>>>>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is 
>>>>>>>> well
>>>>>>>> abstracted by using ColumnVector ( 
>>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>>>>>>>> (
>>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>>>>>>>> ) ) class. By combining with ColumnarBatchScan, the whole-stage code
>>>>>>>> generation generate code to directly get valus from the columnar 
>>>>>>>> storage
>>>>>>>> if there is no row-based operation.
>>>>>>>> Note: The current master does not support Arrow as a data source. 
>>>>>>>> However,
>>>>>>>> I think it is not technically hard to support Arrow.
>>>>>>>> 
>>>>>>>> 2. The current whole-stage codegen generates code for element-wise
>>>>>>>> selection (excluding sort and join). The SIMDzation or GPUization
>>>>>>>> capability depends on a compiler that translates native code from the 
>>>>>>>> code
>>>>>>>> generated by the whole-stage codegen.
>>>>>>>> 
>>>>>>>> 3. The current Projection assume to store row-oriented data, I think 
>>>>>>>> that
>>>>>>>> is a part that Wenchen pointed out
>>>>>>>> 
>>>>>>>> My slides 
>>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>>>>>>> (
>>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use
>>>>>>>> ) may simplify the above issue and possible implementation.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru 
>>>>>>>> Python
>>>>>>>> at SAIS 2019 
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>>>>>>>> (
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>>>>>>>> ). I think that it uses Python UDF support with Arrow in Spark.
>>>>>>>> 
>>>>>>>> P.S. I will give a presentation about in-memory data storages for 
>>>>>>>> SPark at
>>>>>>>> SAIS 2019 
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>>>>>>> (
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>>>>>>> ) :)
>>>>>>>> 
>>>>>>>> Kazuaki Ishizaki
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> From:        Wenchen Fan < cloud0fan@ gmail. com ( cloud0...@gmail.com 
>>>>>>>> ) >
>>>>>>>> 
>>>>>>>> To:        Bobby Evans < bobby@ apache. org ( bo...@apache.org ) >
>>>>>>>> Cc:        Spark dev list < dev@ spark. apache. org ( 
>>>>>>>> dev@spark.apache.org
>>>>>>>> ) >
>>>>>>>> Date:        2019/03/26 13:53
>>>>>>>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Do you have some initial perf numbers? It seems fine to me to remain
>>>>>>>> row-based inside Spark with whole-stage-codegen, and convert rows to
>>>>>>>> columnar batches when communicating with external systems.
>>>>>>>> 
>>>>>>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bo...@apache.org (
>>>>>>>> bo...@apache.org ) > wrote:
>>>>>>>> This thread is to discuss adding in support for data frame processing
>>>>>>>> using an in-memory columnar format compatible with Apache Arrow.  My 
>>>>>>>> main
>>>>>>>> goal in this is to lay the groundwork so we can add in support for GPU
>>>>>>>> accelerated processing of data frames, but this feature has a number of
>>>>>>>> other benefits.  Spark currently supports Apache Arrow formatted data 
>>>>>>>> as
>>>>>>>> an option to exchange data with python for pandas UDF processing. There
>>>>>>>> has also been discussion around extending this to allow for exchanging
>>>>>>>> data with other tools like pytorch, tensorflow, xgboost,... If Spark
>>>>>>>> supports processing on Arrow compatible data it could eliminate the
>>>>>>>> serialization/deserialization overhead when going between these 
>>>>>>>> systems. 
>>>>>>>> It also would allow for doing optimizations on a CPU with SIMD
>>>>>>>> instructions similar to what Hive currently supports. Accelerated
>>>>>>>> processing using a GPU is something that we will start a separate
>>>>>>>> discussion thread on, but I wanted to set the context a bit.
>>>>>>>> Jason Lowe, Tom Graves, and I created a prototype over the past few 
>>>>>>>> months
>>>>>>>> to try and understand how to make this work.  What we are proposing is
>>>>>>>> based off of lessons learned when building this prototype, but we 
>>>>>>>> really
>>>>>>>> wanted to get feedback early on from the community. We will file a SPIP
>>>>>>>> once we can get agreement that this is a good direction to go in.
>>>>>>>> 
>>>>>>>> The current support for columnar processing lets a Parquet or Orc file
>>>>>>>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s
>>>>>>>> type erasure. The code generation is aware that the RDD actually holds
>>>>>>>> ColumnarBatchs and generates code to loop through the data in each 
>>>>>>>> batch
>>>>>>>> as InternalRows.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Instead, we propose a new set of APIs to work on an
>>>>>>>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this 
>>>>>>>> we
>>>>>>>> propose adding in a Rule similar to how WholeStageCodeGen currently 
>>>>>>>> works.
>>>>>>>> Each part of the physical SparkPlan would expose columnar support 
>>>>>>>> through
>>>>>>>> a combination of traits and method calls. The rule would then decide 
>>>>>>>> when
>>>>>>>> columnar processing would start and when it would end. Switching 
>>>>>>>> between
>>>>>>>> columnar and row based processing is not free, so the rule would make a
>>>>>>>> decision based off of an estimate of the cost to do the transformation 
>>>>>>>> and
>>>>>>>> the estimated speedup in processing time.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> This should allow us to disable columnar support by simply disabling 
>>>>>>>> the
>>>>>>>> rule that modifies the physical SparkPlan.  It should be minimal risk 
>>>>>>>> to
>>>>>>>> the existing row-based code path, as that code should not be touched, 
>>>>>>>> and
>>>>>>>> in many cases could be reused to implement the columnar version.  This
>>>>>>>> also allows for small easily manageable patches. No huge patches that 
>>>>>>>> no
>>>>>>>> one wants to review.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> As far as the memory layout is concerned OnHeapColumnVector and
>>>>>>>> OffHeapColumnVector are already really close to being Apache Arrow
>>>>>>>> compatible so shifting them over would be a relatively simple change.
>>>>>>>> Alternatively we could add in a new implementation that is Arrow
>>>>>>>> compatible if there are reasons to keep the old ones.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Again this is just to get the discussion started, any feedback is 
>>>>>>>> welcome,
>>>>>>>> and we will file a SPIP on it once we feel like the major changes we 
>>>>>>>> are
>>>>>>>> proposing are acceptable.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Bobby Evans
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Renjie Liu
>>>> Software Engineer, MVAD
>>>> 
>>> 
>>> 
>> 
>> 
> 
>

Reply via email to