I just realized we had an earlier SPIP on a similar topic: https://issues.apache.org/jira/browse/SPARK-24579
Perhaps we should tie the two together. IIUC, you'd want to expose the existing ColumnBatch API, but also provide utilities to directly convert from/to Arrow. On Thu, Apr 11, 2019 at 7:13 AM, Bobby Evans < bo...@apache.org > wrote: > > The SPIP has been up for almost 6 days now with really no discussion on > it. I am hopeful that means it's okay and we are good to call a vote on > it, but I want to give everyone one last chance to take a look and > comment. If there are no comments by tomorrow I this we will start a vote > for this. > > > Thanks, > > > Bobby > > On Fri, Apr 5, 2019 at 2:24 PM Bobby Evans < bobby@ apache. org ( > bo...@apache.org ) > wrote: > > >> I just filed SPARK-27396 as the SPIP for this proposal. Please use that >> JIRA for further discussions. >> >> >> Thanks for all of the feedback, >> >> >> Bobby >> >> On Wed, Apr 3, 2019 at 7:15 PM Bobby Evans < bobby@ apache. org ( >> bo...@apache.org ) > wrote: >> >> >>> I am still working on the SPIP and should get it up in the next few days. >>> I have the basic text more or less ready, but I want to get a high-level >>> API concept ready too just to have something more concrete. I have not >>> really done much with contributing new features to spark so I am not sure >>> where a design document really fits in here because from http:/ / spark. >>> apache. >>> org/ improvement-proposals. html ( >>> http://spark.apache.org/improvement-proposals.html ) and http:/ / spark. >>> apache. >>> org/ contributing. html ( http://spark.apache.org/contributing.html ) it >>> does not mention a design anywhere. I am happy to put one up, but I was >>> hoping the API concept would cover most of that. >>> >>> >>> Thanks, >>> >>> >>> Bobby >>> >>> On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu < liurenjie2008@ gmail. com ( >>> liurenjie2...@gmail.com ) > wrote: >>> >>> >>>> Hi, Bobby: >>>> Do you have design doc? I'm also interested in this topic and want to help >>>> contribute. >>>> >>>> On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans < bobby@ apache. org ( >>>> bo...@apache.org ) > wrote: >>>> >>>> >>>>> Thanks to everyone for the feedback. >>>>> >>>>> >>>>> Overall the feedback has been really positive for exposing columnar as a >>>>> processing option to users. I'll write up a SPIP on the proposed changes >>>>> to support columnar processing (not necessarily implement it) and then >>>>> ping the list again for more feedback and discussion. >>>>> >>>>> >>>>> Thanks again, >>>>> >>>>> >>>>> Bobby >>>>> >>>>> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin < rxin@ databricks. com ( >>>>> r...@databricks.com ) > wrote: >>>>> >>>>> >>>>>> I just realized I didn't make it very clear my stance here ... here's >>>>>> another try: >>>>>> >>>>>> >>>>>> >>>>>> I think it's a no brainer to have a good columnar UDF interface. This >>>>>> would facilitate a lot of high performance applications, e.g. GPU-based >>>>>> accelerations for machine learning algorithms. >>>>>> >>>>>> >>>>>> >>>>>> On rewriting the entire internals of Spark SQL to leverage columnar >>>>>> processing, I don't see enough evidence to suggest that's a good idea >>>>>> yet. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans < bobby@ apache. org ( >>>>>> bo...@apache.org ) > wrote: >>>>>> >>>>>>> Kazuaki Ishizaki, >>>>>>> >>>>>>> >>>>>>> Yes, ColumnarBatchScan does provide a framework for doing code >>>>>>> generation >>>>>>> for the processing of columnar data. I have to admit that I don't have >>>>>>> a >>>>>>> deep understanding of the code generation piece, so if I get something >>>>>>> wrong please correct me. From what I had seen only input formats >>>>>>> currently inherent from ColumnarBatchScan, and from comments in the >>>>>>> trait >>>>>>> >>>>>>> >>>>>>> /** >>>>>>> * Generate [[ColumnVector]] expressions for our parent to consume as >>>>>>> rows. >>>>>>> * This is called once per [[ColumnarBatch]]. >>>>>>> */ >>>>>>> https:/ / github. com/ apache/ spark/ blob/ >>>>>>> 956b52b1670985a67e49b938ac1499ae65c79f6e/ >>>>>>> sql/ core/ src/ main/ scala/ org/ apache/ spark/ sql/ execution/ >>>>>>> ColumnarBatchScan. >>>>>>> scala#L42-L43 ( >>>>>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43 >>>>>>> ) >>>>>>> >>>>>>> >>>>>>> >>>>>>> It appears that ColumnarBatchScan is really only intended to pull out >>>>>>> the >>>>>>> data from the batch, and not to process that data in a columnar >>>>>>> fashion. >>>>>>> The Loading stage that you mentioned. >>>>>>> >>>>>>> >>>>>>> > The SIMDzation or GPUization capability depends on a compiler that >>>>>>> translates native code from the code generated by the whole-stage >>>>>>> codegen. >>>>>>> >>>>>>> To be able to support vectorized processing Hive stayed with pure java >>>>>>> and >>>>>>> let the JVM detect and do the SIMDzation of the code. To make that >>>>>>> happen >>>>>>> they created loops to go through each element in a column and remove all >>>>>>> conditionals from the body of the loops. To the best of my knowledge >>>>>>> that >>>>>>> would still require a separate code path like I am proposing to make the >>>>>>> different processing phases generate code that the JVM can compile down >>>>>>> to >>>>>>> SIMD instructions. The generated code is full of null checks for each >>>>>>> element which would prevent the operations we want. Also, the >>>>>>> intermediate results are often stored in UnsafeRow instances. This is >>>>>>> really fast for row-based processing, but the complexity of how they >>>>>>> work >>>>>>> I believe would prevent the JVM from being able to vectorize the >>>>>>> processing. If you have a better way to take java code and vectorize it >>>>>>> we should put it into OpenJDK instead of spark so everyone can benefit >>>>>>> from it. >>>>>>> >>>>>>> >>>>>>> Trying to compile directly from generated java code to something a GPU >>>>>>> can >>>>>>> process is something we are tackling but we decided to go a different >>>>>>> route from what you proposed. From talking with several compiler >>>>>>> experts >>>>>>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA >>>>>>> attempted in the past to extend the JVM to run at least partially on >>>>>>> GPUs, >>>>>>> but it was really difficult to get right, especially with how java does >>>>>>> memory management and memory layout. >>>>>>> >>>>>>> >>>>>>> To avoid that complexity we decided to split the JITing up into two >>>>>>> separate pieces. I didn't mention any of this before because this >>>>>>> discussion was intended to just be around the memory layout support, and >>>>>>> not GPU processing. The first part would be to take the Catalyst AST >>>>>>> and >>>>>>> produce CUDA code directly from it. If properly done we should be able >>>>>>> to >>>>>>> do the selection and projection phases within a single kernel. The >>>>>>> biggest issue comes with UDFs as they cannot easily be vectorized for >>>>>>> the >>>>>>> CPU or GPU. So to deal with that we have a prototype written by the >>>>>>> compiler team that is trying to tackle SPARK-14083 which can translate >>>>>>> basic UDFs into catalyst expressions. If the UDF is too complicated or >>>>>>> covers operations not yet supported it will fall back to the original >>>>>>> UDF >>>>>>> processing. I don't know how close the team is to submit a SPIP or a >>>>>>> patch for it, but I do know that they have some very basic operations >>>>>>> working. The big issue is that it requires java 11+ so it can use >>>>>>> standard APIs to get the byte code of scala UDFs. >>>>>>> >>>>>>> >>>>>>> We split it this way because we thought it would be simplest to >>>>>>> implement, >>>>>>> and because it would provide a benefit to more than just GPU accelerated >>>>>>> queries. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> >>>>>>> Bobby >>>>>>> >>>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki < ISHIZAKI@ jp. ibm. >>>>>>> com >>>>>>> ( ishiz...@jp.ibm.com ) > wrote: >>>>>>> >>>>>>> >>>>>>>> Looks interesting discussion. >>>>>>>> Let me describe the current structure and remaining issues. This is >>>>>>>> orthogonal to cost-benefit trade-off discussion. >>>>>>>> >>>>>>>> The code generation basically consists of three parts. >>>>>>>> 1. Loading >>>>>>>> 2. Selection (map, filter, ...) >>>>>>>> 3. Projection >>>>>>>> >>>>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is >>>>>>>> well >>>>>>>> abstracted by using ColumnVector ( >>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java >>>>>>>> ( >>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java >>>>>>>> ) ) class. By combining with ColumnarBatchScan, the whole-stage code >>>>>>>> generation generate code to directly get valus from the columnar >>>>>>>> storage >>>>>>>> if there is no row-based operation. >>>>>>>> Note: The current master does not support Arrow as a data source. >>>>>>>> However, >>>>>>>> I think it is not technically hard to support Arrow. >>>>>>>> >>>>>>>> 2. The current whole-stage codegen generates code for element-wise >>>>>>>> selection (excluding sort and join). The SIMDzation or GPUization >>>>>>>> capability depends on a compiler that translates native code from the >>>>>>>> code >>>>>>>> generated by the whole-stage codegen. >>>>>>>> >>>>>>>> 3. The current Projection assume to store row-oriented data, I think >>>>>>>> that >>>>>>>> is a part that Wenchen pointed out >>>>>>>> >>>>>>>> My slides >>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41 >>>>>>>> ( >>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use >>>>>>>> ) may simplify the above issue and possible implementation. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru >>>>>>>> Python >>>>>>>> at SAIS 2019 >>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110 >>>>>>>> ( >>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110 >>>>>>>> ). I think that it uses Python UDF support with Arrow in Spark. >>>>>>>> >>>>>>>> P.S. I will give a presentation about in-memory data storages for >>>>>>>> SPark at >>>>>>>> SAIS 2019 >>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 >>>>>>>> ( >>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 >>>>>>>> ) :) >>>>>>>> >>>>>>>> Kazuaki Ishizaki >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> From: Wenchen Fan < cloud0fan@ gmail. com ( cloud0...@gmail.com >>>>>>>> ) > >>>>>>>> >>>>>>>> To: Bobby Evans < bobby@ apache. org ( bo...@apache.org ) > >>>>>>>> Cc: Spark dev list < dev@ spark. apache. org ( >>>>>>>> dev@spark.apache.org >>>>>>>> ) > >>>>>>>> Date: 2019/03/26 13:53 >>>>>>>> Subject: Re: [DISCUSS] Spark Columnar Processing >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Do you have some initial perf numbers? It seems fine to me to remain >>>>>>>> row-based inside Spark with whole-stage-codegen, and convert rows to >>>>>>>> columnar batches when communicating with external systems. >>>>>>>> >>>>>>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bo...@apache.org ( >>>>>>>> bo...@apache.org ) > wrote: >>>>>>>> This thread is to discuss adding in support for data frame processing >>>>>>>> using an in-memory columnar format compatible with Apache Arrow. My >>>>>>>> main >>>>>>>> goal in this is to lay the groundwork so we can add in support for GPU >>>>>>>> accelerated processing of data frames, but this feature has a number of >>>>>>>> other benefits. Spark currently supports Apache Arrow formatted data >>>>>>>> as >>>>>>>> an option to exchange data with python for pandas UDF processing. There >>>>>>>> has also been discussion around extending this to allow for exchanging >>>>>>>> data with other tools like pytorch, tensorflow, xgboost,... If Spark >>>>>>>> supports processing on Arrow compatible data it could eliminate the >>>>>>>> serialization/deserialization overhead when going between these >>>>>>>> systems. >>>>>>>> It also would allow for doing optimizations on a CPU with SIMD >>>>>>>> instructions similar to what Hive currently supports. Accelerated >>>>>>>> processing using a GPU is something that we will start a separate >>>>>>>> discussion thread on, but I wanted to set the context a bit. >>>>>>>> Jason Lowe, Tom Graves, and I created a prototype over the past few >>>>>>>> months >>>>>>>> to try and understand how to make this work. What we are proposing is >>>>>>>> based off of lessons learned when building this prototype, but we >>>>>>>> really >>>>>>>> wanted to get feedback early on from the community. We will file a SPIP >>>>>>>> once we can get agreement that this is a good direction to go in. >>>>>>>> >>>>>>>> The current support for columnar processing lets a Parquet or Orc file >>>>>>>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s >>>>>>>> type erasure. The code generation is aware that the RDD actually holds >>>>>>>> ColumnarBatchs and generates code to loop through the data in each >>>>>>>> batch >>>>>>>> as InternalRows. >>>>>>>> >>>>>>>> >>>>>>>> Instead, we propose a new set of APIs to work on an >>>>>>>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this >>>>>>>> we >>>>>>>> propose adding in a Rule similar to how WholeStageCodeGen currently >>>>>>>> works. >>>>>>>> Each part of the physical SparkPlan would expose columnar support >>>>>>>> through >>>>>>>> a combination of traits and method calls. The rule would then decide >>>>>>>> when >>>>>>>> columnar processing would start and when it would end. Switching >>>>>>>> between >>>>>>>> columnar and row based processing is not free, so the rule would make a >>>>>>>> decision based off of an estimate of the cost to do the transformation >>>>>>>> and >>>>>>>> the estimated speedup in processing time. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> This should allow us to disable columnar support by simply disabling >>>>>>>> the >>>>>>>> rule that modifies the physical SparkPlan. It should be minimal risk >>>>>>>> to >>>>>>>> the existing row-based code path, as that code should not be touched, >>>>>>>> and >>>>>>>> in many cases could be reused to implement the columnar version. This >>>>>>>> also allows for small easily manageable patches. No huge patches that >>>>>>>> no >>>>>>>> one wants to review. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> As far as the memory layout is concerned OnHeapColumnVector and >>>>>>>> OffHeapColumnVector are already really close to being Apache Arrow >>>>>>>> compatible so shifting them over would be a relatively simple change. >>>>>>>> Alternatively we could add in a new implementation that is Arrow >>>>>>>> compatible if there are reasons to keep the old ones. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Again this is just to get the discussion started, any feedback is >>>>>>>> welcome, >>>>>>>> and we will file a SPIP on it once we feel like the major changes we >>>>>>>> are >>>>>>>> proposing are acceptable. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Bobby Evans >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Renjie Liu >>>> Software Engineer, MVAD >>>> >>> >>> >> >> > >