codope commented on a change in pull request #3957:
URL: https://github.com/apache/hudi/pull/3957#discussion_r794432109



##########
File path: rfc/rfc-40/rfc-40.md
##########
@@ -0,0 +1,195 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# RFC-40: Hudi Connector for Trino
+
+## Proposers
+
+- @codope
+
+## Approvers
+
+- @bvaradar
+- @vinothchandar
+
+## Status
+
+JIRA: https://issues.apache.org/jira/browse/HUDI-2687
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Today, Hudi supports snapshot queries on Copy-On-Write (COW) tables and 
read-optimized queries on Merge-On-Read (MOR)
+tables with Trino, through the input format based integration in the Hive 
connector. This approach has known performance
+limitations with very large tables. Moreover, as Hudi keeps getting better, a 
new plugin to provide access to Hudi data
+and metadata will help in unlocking capabilities such as metadata-based 
listing, full schema evolution, etc. for the
+Trino users. A separate Hudi connector would also allow its independent 
evolution without having to worry about
+hacking/breaking the Hive connector. A separate connector also falls in line 
with our vision when we think of a
+standalone timeline server or a lake cache to balance the tradeoff between 
writing and querying.
+
+## Background
+
+The current Trino integration relies on a custom annotation 
`@UseFileSplitsFromInputFormat`. Any input format that has
+this annotation would fetch splits by invoking the corresponding input 
format’s `getSplits()` method instead of Trino's
+Hive connector native split loading logic. For instance, realtime queries on 
Hudi tables queried via Trino, this would
+be a simple call to `HoodieParquetRealtimeInputFormat.getSplits()`. This 
approach has known performance limitations
+because of the redundant Hudi table metadata listing while loading splits. 
This issue has been fixed to some extent in
+Presto and the work to upstream those changes to Trino is [in 
progress](https://github.com/trinodb/trino/pull/9641).
+
+A connector enables Trino to communicate with external data sources. The 
connector interface is composed of four parts:
+the Metadata API, Data Location API, Data Source API, and Data Sink API. These 
APIs are designed to allow performant
+implementations of connectors within the environment of Trino's distributed 
execution engine. For an overview of the
+Trino architecture please see [Trino 
concepts](https://trino.io/docs/current/overview/concepts.html).
+
+### Trino query execution model
+
+When Trino executes a query, it does so by breaking up the execution into a 
hierarchy of **stages**. A single stage is
+implemented as a series of **tasks** distributed over a network of Trino 
workers. Tasks operate on **splits**, which are
+partitions of a larger data set. Tasks at the source stage produce data in the 
form of **pages**, which are a collection
+of rows in columnar format. These pages flow to other intermediate downstream 
stages.
+
+## Implementation
+
+Trino provides a service provider interface (SPI), which is a type of API used 
to implement a connector. By implementing
+the SPI in a connector, Trino can use standard operations internally to 
connect to any data source and perform
+operations on any data source. The connector takes care of the details 
relevant to the specific data source.
+
+Hudi connector will implement three parts of the API:
+
+- Operations to fetch table/view/schema metadata.
+- Operations to produce logical units of data partitioning, so that Trino can 
parallelize reads and writes.
+- Data sources and sinks that convert the source data to/from the in-memory 
format expected by the query engine.
+
+Hudi connector will be registered as a plugin, which will be loaded by Trino 
server at startup. The entry point will
+be `HudiPlugin`, an implementation of the `Plugin` interface. Instances of 
Hudi connector are created by a
+ConnectorFactory instance which is created when Trino calls 
`getConnectorFactory()` on the plugin.
+A class-diagrammatic view of the different components is shown below.
+![](Hudi_Connector.png)
+
+### Operations to fetch table/view/schema metadata
+
+The `ConnectorMetadata` interface provides important methods that are 
responsible for allowing Trino to look at lists of
+schemas, lists of tables, lists of columns, and other metadata about a 
particular data source. The implementation of
+this interface will create the `HoodieTableMetaClient` and pass it to the 
connector table handle through which Trino 
+can access metadata of a Hudi table.
+
+
+### Operations to produce logical units of data partitioning
+
+We will need to implement the `ConnectorSplit` and `ConnectorSplitManager` 
interfaces. Hudi splits will be similar to
+how Hive connector describes splits in the form of a path to a file with 
offset and length that indicate which part of
+the file needs to be processed.
+
+```java
+public class HudiSplit
+    implements ConnectorSplit {
+  private final String path;
+  private final long start;
+  private final long length;
+  private final long fileSize;
+  private final List<HostAddress> addresses;
+  private final TupleDomain<HiveColumnHandle> predicate;
+  private final List<HivePartitionKey> partitionKeys;
+}
+```
+
+The split manager will partition the data for a table into the individual 
chunks that Trino will distribute to workers
+for processing. This is where the partition loader logic will reside. While 
listing the files for each Hudi partition
+the split manager will create one or more split per file. For non-partitioned 
table, the split mamager will simply
+return a single split for the entire table.
+
+During query execution, the Trino coordinator tracks all splits available for 
processing and the locations where tasks
+are running on workers and processing splits. As tasks finish processing and 
are producing more splits for downstream
+processing, the coordinator continues to schedule tasks until no splits remain 
for processing. Once all splits are
+processed on the workers, all data is available, and the coordinator can make 
the result available to the client.
+
+### Data source
+
+As mentioned in the query execution model, tasks in the source stage produce 
data in the form of pages. The Connector
+Data Source API returns pages when it is passed a split, and operators 
typically consume input pages, perform
+computation, and produce output pages. This is where we will implement 
`ConnectorPageSourceProvider` interface to create
+page source. We could have different page sources for different formats like 
parquet, orc and avro. For the data source,
+we plan to reuse the `ParquetPageSource` in the Hive connector. This has the 
advantage of using Trino's
+custom `ParquetReader` that can efficiently skip data sections by using 
statistics in file headers/footers. This is also
+where we will handle the column projections and build predicates for the 
parquet reader.
+
+```java
+public class HudiPageSourceProvider
+    implements ConnectorPageSourceProvider {
+  private final HdfsEnvironment hdfsEnvironment;
+  private final FileFormatDataSourceStats fileFormatDataSourceStats;
+  private final ParquetReaderOptions parquetReaderOptions;
+  private final DateTimeZone timeZone;
+}
+```
+In summary, Trino coordinator uses the metadata and split manager APIs to 
gather information about the table and partitions to
+generate a query plan and logical splits of the table contents. Each split is 
processed by a task in the Trino worker.
+Here, workers invoke the page source APIs as tasks produce data in the form of 
pages.
+Subsequently, native (parquet) reader read the block of pages while executing 
the query.
+
+### Improving split parallelism
+
+In order to improve split parallelism, we will push the fetch of partitions 
down to split source.
+Since each partition is independent of another, so we batch them up and each 
batch is processed by a thread.
+Additionally, the full list of partition names are fetched just once from the 
metastore and the subsequent
+construction of the partition key from the partition name can be done in 
parallel as well.
+
+### Improving listing
+In order to improve listing, we assume that the path exists,
+and so we bypass the `FileSystem#exists` check in 
`AbstractHoodieTableFileSystemView` while fetching latest base files.
+The connector will also support metadata-based listing which will retrieve 
partition listings from Hudi's internal metadata table. 
+This should further help improve the performance.
+
+### Snapshot queries on MOR table
+
+This requires merging base file and log files.
+One way is to use the `HoodieRealtimeRecordReader` which can do compacted 
reading.
+However, this means we will have to give up Trino's optimized parquet reader.
+Another way is to enumerate the merged splits and use the native reader.
+This can be done in `HoodieRealtimeInputFormatUtils#getRealtimeSplits()` which 
is invoked in `HoodieParquetRealtimeInputFormat`.
+We can reuse this logic for reading MOR table via the connector.
+
+## Rollout/Adoption Plan
+
+- What impact (if any) will there be on existing users?
+
+There will be no impact on existing users because this is a new connector. It 
does not change the behavior of current
+integration through the existing Hive connector. It gives users more choice.
+
+- What do we lose if we move away from the Hive connector?
+
+Hive connector takes advantage of 
[caching](https://trino.io/docs/current/connector/hive-caching.html) to reduce 
load on

Review comment:
       Yes. Will add a link in the rfc as well.
   HUDI-3339




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to