This thread is to discuss adding in support for data frame processing using
an in-memory columnar format compatible with Apache Arrow.  My main goal in
this is to lay the groundwork so we can add in support for GPU accelerated
processing of data frames, but this feature has a number of other
benefits.  Spark currently supports Apache Arrow formatted data as an
option to exchange data with python for pandas UDF processing. There has
also been discussion around extending this to allow for exchanging data
with other tools like pytorch, tensorflow, xgboost,... If Spark supports
processing on Arrow compatible data it could eliminate the
serialization/deserialization overhead when going between these systems.
It also would allow for doing optimizations on a CPU with SIMD instructions
similar to what Hive currently supports. Accelerated processing using a GPU
is something that we will start a separate discussion thread on, but I
wanted to set the context a bit.

Jason Lowe, Tom Graves, and I created a prototype over the past few months
to try and understand how to make this work.  What we are proposing is
based off of lessons learned when building this prototype, but we really
wanted to get feedback early on from the community. We will file a SPIP
once we can get agreement that this is a good direction to go in.

The current support for columnar processing lets a Parquet or Orc file
format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
erasure. The code generation is aware that the RDD actually holds
ColumnarBatchs and generates code to loop through the data in each batch as
InternalRows.

Instead, we propose a new set of APIs to work on an
RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
propose adding in a Rule similar to how WholeStageCodeGen currently works.
Each part of the physical SparkPlan would expose columnar support through a
combination of traits and method calls. The rule would then decide when
columnar processing would start and when it would end. Switching between
columnar and row based processing is not free, so the rule would make a
decision based off of an estimate of the cost to do the transformation and
the estimated speedup in processing time.

This should allow us to disable columnar support by simply disabling the
rule that modifies the physical SparkPlan.  It should be minimal risk to
the existing row-based code path, as that code should not be touched, and
in many cases could be reused to implement the columnar version.  This also
allows for small easily manageable patches. No huge patches that no one
wants to review.

As far as the memory layout is concerned OnHeapColumnVector and
OffHeapColumnVector are already really close to being Apache Arrow
compatible so shifting them over would be a relatively simple change.
Alternatively we could add in a new implementation that is Arrow compatible
if there are reasons to keep the old ones.

Again this is just to get the discussion started, any feedback is welcome,
and we will file a SPIP on it once we feel like the major changes we are
proposing are acceptable.

Thanks,

Bobby Evans

Reply via email to