Hi François,

Thank you very much for this kindly detailed analysis! Since I am not so 
professional to the project
Arrow, this could help me quite a lot so that I don't have to try inventing 
everything by my own
imagination.

Addressing your comment:

> Having said that, I think I understand where you want to go. The
> FileFormat::ScanFile method has what you want without the overhead of
> the full dataset API. It acts as an interface to interact with file
> format paired with predicate pushdown and column selection options.

Yes exactly. Actually I have been initializing my work based on the FileFormat 
interface for days.
And your assumption is correct that I don't have to interact with 
DataSource(Discovery)/Dataset
basis, Spark already provides its solution. So my initial plan was 
DataSource/Dataset part may not
be included in my work, similarly the first one of "Cons" you have listed 
doesn't bother me at this
time.

Meantime I was thinking about making things more general for Arrow. Referencing 
one of your
suggestions:

> - Create a JNI binding to the previous helper and all the class
> dependencies to construct the parameters (FileSource, FileFormat,
> ScanOptions, ScanContext).

Assuming we expect to have these mirrored classes populated in Java, I think 
then what is being
written is something exactly like in a way the fundamental of a Java version 
File-Based Datasets
API/Framework. This is why I guessed things would end up with porting (some of) 
the C++ Datasets API
to Java, although I don't have to touch higher level DataSource-related stuffs 
at my first
development iteration. So Francois, would you suggest to file a JIRA for 
"Implement file-based
Datasets scan in Java" or something? I believe the functionality will help a 
lot for projects that
already depends on or are in evaluation of Arrow.

As for DataSource/Dataset, I could feel that they are not designed for 
distribution computing too.
Especially in SQL, they currently look more like some black boxes to the query 
planner, some of the
optimizations could not be done easily on top of that. Thus I suppose my 
problem on using Arrow
could be a wilder problem - Not only Spark, A lot of data query systems has 
their own
metadata/catalog implementations. To them, the best way to integrate Arrow 
readers might be similar
way as I am doing now. 
So I think what you suggested by the C++ helper method 
`Result<Iterator<RecordBatch>>
ScanFile(FileSource source, FileFormat& fmt, std::shared_ptr<ScanOptions> 
options,
std::shared_ptr<ScanContext> context)` is a good solution for this problem. And 
I am not sure if we
could go futher - like creating a new "SingleFileDataSource" or something, by 
all means no
partitioning support there.

And:
> This is where it gets cumbersome, ScanOptions has Expression which may
> not be easy to build ad-hoc. FileSource needs a fs::Filesystem,
> ScanContext needs a MemoryPool, etc... You may hide this via helper
> methods, this is what the R binding is doing.

Thanks and this is definitely a very important one. Now Datasets has it's own 
Filter expression
structures (And thanks for referencing ARROW-6953, but seems the work is not 
started yet?). The Java
side work should contain Java porting of these structures, which seems to be a 
big bunch of work.
Regarding MemoryPool and FileSystem I would try manage the instances in C++, 
and passing some
constant flags via JNI if needed. Could this be a feasible practice?

Thanks,
Hongze




On Wed, 2019-11-27 at 16:08 -0500, Francois Saint-Jacques wrote:
> Hello Hongze,
> 
> The C++ implementation of dataset, notably Dataset, DataSource,
> DataSourceDiscovery, and Scanner classes are not ready/designed for
> distributed computing. They don't serialize and they reference by
> pointer all around, thus I highly doubt that you can implement parts
> in Java, and some in C++ with minimal effort and complexity. You can
> think of Dataset/DataSource as similar to the Hive Metastore, but
> locally (single node) and in-memory. I fail to see how one could use
> it with the execution model of spark, e.g. construct all the manifests
> on the driver via Dataset, Scanner and pass the ScanTask to executors
> due to previous limitations. One cannot construct a ScanTask out of
> thin air, it needs a DataFragment (or FileFormat in case of
> FileDataFragment).
> 
> Having said that, I think I understand where you want to go. The
> FileFormat::ScanFile method has what you want without the overhead of
> the full dataset API. It acts as an interface to interact with file
> format paired with predicate pushdown and column selection options.
> This is where I would start:
> 
> - Create a JNI bridge between a C++ RecordBatch and Java VectorSchemaRoot [1]
> - Create a C++ helper method `Result<Iterator<RecordBatch>>
> ScanFile(FileSource source, FileFormat& fmt,
> std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext>
> context)`
> The goal of this method is similar to `Scanner::ToTable`, i.e. hide
> the local scheduling details of ScanTask. Thus you don't need to
> expose ScanTask.
> - Create a JNI binding to the previous helper and all the class
> dependencies to construct the parameters (FileSource, FileFormat,
> ScanOptions, ScanContext).
> This is where it gets cumbersome, ScanOptions has Expression which may
> not be easy to build ad-hoc. FileSource needs a fs::Filesystem,
> ScanContext needs a MemoryPool, etc... You may hide this via helper
> methods, this is what the R binding is doing.
> 
> Your PoC can probably get away with a trivial
> `Result<Iterator<RecordBatch>> ScanParquetFile(std::string path, Expr&
> filter, std::vector<std::string> columns)` without exposing all the
> details and using the "defaults". Thus you only need to wrap a method
> (ScanParquetFile) and Expression in your JNI bridge.
> 
> Pros:
> - Access to native file readers with uniform predicate pushdown (when
> the file format supports it), and column selection options. Filtering
> done natively in C++.
> - Enable usage of said points in distributed computing, since the only
> passed information are: the path, the expression (will need a
> translation), and the list of columns. All of which are tracktable to
> serialize.
> - Bonus, you may even get transparent access to gandiva [2]
> 
> Cons:
> - No predicate pushdown on file partition, e.g. extracted from path
> because this information is in the DataSource
> - ScanOptions is built by ScannerBuilder, there's a lot of validation
> hidden under the hood via DataSource, DataSourceDiscovery and
> ScannerBuilder. It's easy to get an error with a malformed
> ScanOptions.
> - No access to non-file DataSource, e.g. in the future we might have
> OdbcDataSource and FlightDataSource
> 
> Basically, dataset::FileFormat is meant to be a unified interface to
> interact with file formats. Here's an example of such usage without
> all the dataset machinery [3].
> 
> François
> 
> [1] https://issues.apache.org/jira/browse/ARROW-7272
> [2] https://issues.apache.org/jira/browse/ARROW-6953
> [3] 
> https://github.com/apache/arrow/blob/61c8b1b80039119d5905660289dd53a3130ce898/cpp/src/arrow/dataset/file_parquet_test.cc#L345-L393
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 27, 2019 at 5:17 AM Hongze Zhang <notify...@126.com> wrote:
> > Hi Micah,
> > 
> > 
> > Regarding our use cases, we'd use the API on Parquet files with some pushed 
> > filters and
> > projectors, and we'd extend the C++ Datasets code to provide necessary 
> > support for our own data
> > formats.
> > 
> > 
> > > If JNI is seen as too cumbersome, another possible avenue to pursue is
> > > writing a gRPC wrapper around the DataSet metadata capabilities.  One 
> > > could
> > > then create a facade on top of that for Java.  For data reads, I can see
> > > either building a Flight server or directly use the JNI readers.
> > 
> > Thanks for your suggestion but I'm not entirely getting it. Does this mean 
> > to start some
> > individual gRPC/Flight server process to deal with the metadata/data 
> > exchange problem between
> > Java and C++ Datasets? If yes, then in some cases, doesn't it easily 
> > introduce bigger problems
> > about life cycle and resource management of the processes? Please correct 
> > me if I misunderstood
> > your point.
> > 
> > 
> > And IMHO I don't strongly hate the possible inconsistencies and bugs bought 
> > by a Java porting of
> > something like the Datasets framework. Inconsistencies are usually in a way 
> > inevitable between
> > two different languages' implementations of the same component, but there 
> > is supposed to be a
> > trade-off based on whether the implementations arre worth to be provided. I 
> > didn't have chance
> > to fully investigate the requirements of Datasets-Java from other projects 
> > so I'm not 100% sure
> > but the functionality such as source discovery, predicate pushdown, 
> > multi-format support could
> > be powerful for many scenarios. Anyway I'm totally with you that the work 
> > amount could be huge
> > and bugs might be brought. So my goal it to start from a small piece of the 
> > APIs to minimize the
> > initial work. What do you think?
> > 
> > 
> > Thanks,
> > Hongze
> > 
> > 
> > 
> > At 2019-11-27 16:00:35, "Micah Kornfield" <emkornfi...@gmail.com> wrote:
> > > Hi Hongze,
> > > I have a strong preference for not porting non-trivial logic from one
> > > language to another, especially if the main goal is performance.  I think
> > > this will replicate bugs and cause confusion if inconsistencies occur.  It
> > > is also a non-trivial amount of work to develop, review, setup CI, etc.
> > > 
> > > If JNI is seen as too cumbersome, another possible avenue to pursue is
> > > writing a gRPC wrapper around the DataSet metadata capabilities.  One 
> > > could
> > > then create a facade on top of that for Java.  For data reads, I can see
> > > either building a Flight server or directly use the JNI readers.
> > > 
> > > In either case this is a non-trivial amount of work, so I at least,
> > > would appreciate a short write-up (1-2 pages) explicitly stating
> > > goals/use-cases for the library and a high level design (component 
> > > overview
> > > and relationships between components and how it will co-exist with 
> > > existing
> > > Java code).  If I understand correctly, one goal is to use this as a basis
> > > for a new Spark DataSet API with better performance than the vectorized
> > > spark parquet reader?  Are there others?
> > > 
> > > Wes, what are your thoughts on this?
> > > 
> > > Thanks,
> > > Micah
> > > 
> > > 
> > > On Tue, Nov 26, 2019 at 10:51 PM Hongze Zhang <notify...@126.com> wrote:
> > > 
> > > > Hi Wes and Micah,
> > > > 
> > > > 
> > > > Thanks for your kindly reply.
> > > > 
> > > > 
> > > > Micah: We don't use Spark (vectorized) parquet reader because it is a 
> > > > pure
> > > > Java implementation. Performance could be worse than doing the similar 
> > > > work
> > > > natively. Another reason is we may need to
> > > > integrate some other specific data sources with Arrow datasets, for
> > > > limiting the workload, we would like to maintain a common read pipeline 
> > > > for
> > > > both this one and other wildly used data sources like Parquet and Csv.
> > > > 
> > > > 
> > > > Wes: Yes, Datasets framework along with Parquet/CSV/... reader
> > > > implementations are totally native, So a JNI bridge will be needed then 
> > > > we
> > > > don't actually read files in Java.
> > > > 
> > > > 
> > > > My another concern is how many C++ datasets components should be bridged
> > > > via JNI. For example,
> > > > bridge the ScanTask only? Or bridge more components including Scanner,
> > > > Table, even the DataSource
> > > > discovery system? Or just bridge the C++ arrow Parquet, Orc readers (as
> > > > Micah said, orc-jni is
> > > > already there) and reimplement everything needed by datasets in Java? 
> > > > This
> > > > might be not that easy to
> > > > decide but currently based on my limited perspective I would prefer to 
> > > > get
> > > > started from the ScanTask
> > > > layer as a result we could leverage some valuable work finished in C++
> > > > datasets and don't have to
> > > > maintain too much tedious JNI code. The real IO process still take place
> > > > inside C++ readers when we
> > > > do scan operation.
> > > > 
> > > > 
> > > > So Wes, Micah, is this similar to your consideration?
> > > > 
> > > > 
> > > > Thanks,
> > > > Hongze
> > > > 
> > > > At 2019-11-27 12:39:52, "Micah Kornfield" <emkornfi...@gmail.com> wrote:
> > > > > Hi Hongze,
> > > > > To add to Wes's point, there are already some efforts to do JNI for 
> > > > > ORC
> > > > > (which needs to be integrated with CI) and some open PRs for Parquet 
> > > > > in
> > > > the
> > > > > project.  However, given that you are using Spark I would expect 
> > > > > there is
> > > > > already dataset functionality that is equivalent to the dataset API 
> > > > > to do
> > > > > rowgroup/partition level filtering.  Can you elaborate on what 
> > > > > problems
> > > > you
> > > > > are seeing with those and what additional use cases you have?
> > > > > 
> > > > > Thanks,
> > > > > Micah
> > > > > 
> > > > > 
> > > > > On Tue, Nov 26, 2019 at 1:10 PM Wes McKinney <wesmck...@gmail.com> 
> > > > > wrote:
> > > > > 
> > > > > > hi Hongze,
> > > > > > 
> > > > > > The Datasets functionality is indeed extremely useful, and it may 
> > > > > > make
> > > > > > sense to have it available in many languages eventually. With Java, 
> > > > > > I
> > > > > > would raise the issue that things are comparatively weaker there 
> > > > > > when
> > > > > > it comes to actually reading the files themselves. Whereas we have
> > > > > > reasonably fast Arrow-based interfaces to CSV, JSON, ORC, and 
> > > > > > Parquet
> > > > > > in C++ the same is not true in Java. Not a deal breaker but worth
> > > > > > taking into consideration.
> > > > > > 
> > > > > > I wonder aloud whether it might be worth investing in a JNI-based
> > > > > > interface to the C++ libraries as one potential approach to save on
> > > > > > development time.
> > > > > > 
> > > > > > - Wes
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > On Tue, Nov 26, 2019 at 5:54 AM Hongze Zhang <notify...@126.com> 
> > > > > > wrote:
> > > > > > > Hi all,
> > > > > > > 
> > > > > > > 
> > > > > > > Recently the datasets API has been improved a lot and I found 
> > > > > > > some of
> > > > > > the new features are very useful to my own work. For example to me a
> > > > > > important one is the fix of ARROW-6952[1]. And as I currently work 
> > > > > > on
> > > > > > Java/Scala projects like Spark, I am now investigating a way to call
> > > > some
> > > > > > of the datasets APIs in Java so that I could gain performance
> > > > improvement
> > > > > > from native dataset filters/projectors. Meantime I am also 
> > > > > > interested in
> > > > > > the ability of scanning different data sources provided by dataset 
> > > > > > API.
> > > > > > > 
> > > > > > > Regarding using datasets in Java, my initial idea is to port (by
> > > > writing
> > > > > > Java-version implementations) some of the high-level concepts in 
> > > > > > Java
> > > > such
> > > > > > as DataSourceDiscovery/DataSet/Scanner/FileFormat, then create and 
> > > > > > call
> > > > > > lower level record batch iterators via JNI. This way we seem to 
> > > > > > retain
> > > > > > performance advantages from c++ dataset code.
> > > > > > > 
> > > > > > > Is anyone interested in this topic also? Or is this something 
> > > > > > > already
> > > > on
> > > > > > the development plan? Any feedback or thoughts would be much
> > > > appreciated.
> > > > > > > 
> > > > > > > Best,
> > > > > > > Hongze
> > > > > > > 
> > > > > > > 
> > > > > > > [1] https://issues.apache.org/jira/browse/ARROW-6952

Reply via email to