Looks interesting discussion.
Let me describe the current structure and remaining issues. This is 
orthogonal to cost-benefit trade-off discussion.

The code generation basically consists of three parts.
1. Loading
2. Selection (map, filter, ...)
3. Projection

1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well 
abstracted by using ColumnVector (
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
) class. By combining with ColumnarBatchScan, the whole-stage code 
generation generate code to directly get valus from the columnar storage 
if there is no row-based operation.
Note: The current master does not support Arrow as a data source. However, 
I think it is not technically hard to support Arrow.

2. The current whole-stage codegen generates code for element-wise 
selection (excluding sort and join). The SIMDzation or GPUization 
capability depends on a compiler that translates native code from the code 
generated by the whole-stage codegen.

3. The current Projection assume to store row-oriented data, I think that 
is a part that Wenchen pointed out

My slides 
https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
 
may simplify the above issue and possible implementation.



FYI. NVIDIA will present an approach to exploit GPU with Arrow thru Python 
at SAIS 2019 
https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
. I think that it uses Python UDF support with Arrow in Spark.

P.S. I will give a presentation about in-memory data storages for SPark at 
SAIS 2019 
https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40 
:)

Kazuaki Ishizaki



From:   Wenchen Fan <cloud0...@gmail.com>
To:     Bobby Evans <bo...@apache.org>
Cc:     Spark dev list <dev@spark.apache.org>
Date:   2019/03/26 13:53
Subject:        Re: [DISCUSS] Spark Columnar Processing



Do you have some initial perf numbers? It seems fine to me to remain 
row-based inside Spark with whole-stage-codegen, and convert rows to 
columnar batches when communicating with external systems.

On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <bo...@apache.org> wrote:
This thread is to discuss adding in support for data frame processing 
using an in-memory columnar format compatible with Apache Arrow.  My main 
goal in this is to lay the groundwork so we can add in support for GPU 
accelerated processing of data frames, but this feature has a number of 
other benefits.  Spark currently supports Apache Arrow formatted data as 
an option to exchange data with python for pandas UDF processing. There 
has also been discussion around extending this to allow for exchanging 
data with other tools like pytorch, tensorflow, xgboost,... If Spark 
supports processing on Arrow compatible data it could eliminate the 
serialization/deserialization overhead when going between these systems.  
It also would allow for doing optimizations on a CPU with SIMD 
instructions similar to what Hive currently supports. Accelerated 
processing using a GPU is something that we will start a separate 
discussion thread on, but I wanted to set the context a bit.
Jason Lowe, Tom Graves, and I created a prototype over the past few months 
to try and understand how to make this work.  What we are proposing is 
based off of lessons learned when building this prototype, but we really 
wanted to get feedback early on from the community. We will file a SPIP 
once we can get agreement that this is a good direction to go in.

The current support for columnar processing lets a Parquet or Orc file 
format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s 
type erasure. The code generation is aware that the RDD actually holds 
ColumnarBatchs and generates code to loop through the data in each batch 
as InternalRows.

Instead, we propose a new set of APIs to work on an 
RDD[InternalColumnarBatch] instead of abusing type erasure. With this we 
propose adding in a Rule similar to how WholeStageCodeGen currently works. 
Each part of the physical SparkPlan would expose columnar support through 
a combination of traits and method calls. The rule would then decide when 
columnar processing would start and when it would end. Switching between 
columnar and row based processing is not free, so the rule would make a 
decision based off of an estimate of the cost to do the transformation and 
the estimated speedup in processing time. 

This should allow us to disable columnar support by simply disabling the 
rule that modifies the physical SparkPlan.  It should be minimal risk to 
the existing row-based code path, as that code should not be touched, and 
in many cases could be reused to implement the columnar version.  This 
also allows for small easily manageable patches. No huge patches that no 
one wants to review.

As far as the memory layout is concerned OnHeapColumnVector and 
OffHeapColumnVector are already really close to being Apache Arrow 
compatible so shifting them over would be a relatively simple change. 
Alternatively we could add in a new implementation that is Arrow 
compatible if there are reasons to keep the old ones.

Again this is just to get the discussion started, any feedback is welcome, 
and we will file a SPIP on it once we feel like the major changes we are 
proposing are acceptable.

Thanks,

Bobby Evans


Reply via email to