JingsongLi commented on a change in pull request #12283: URL: https://github.com/apache/flink/pull/12283#discussion_r429795438
########## File path: docs/dev/table/connectors/filesystem.md ########## @@ -0,0 +1,352 @@ +--- +title: "Hadoop FileSystem Connector" +nav-title: Hadoop FileSystem +nav-parent_id: connectors +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +This connector provides access to partitioned files in filesystems +supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html). + +The file system connector itself is included in Flink and does not require an additional dependency. +A corresponding format needs to be specified for reading and writing rows from and to a file system. + +The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as: + +<div class="codetabs" markdown="1"> +<div data-lang="DDL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + column_name1 INT, + column_name2 STRING, + ... + part_name1 INT, + part_name2 STRING +) PARTITIONED BY (part_name1, part_name2) WITH ( + 'connector' = 'filesystem', -- required: specify to connector type + 'path' = 'file:///path/to/whatever', -- required: path to a directory + 'format' = '...', -- required: file system connector requires to specify a format, + -- Please refer to Table Formats + -- section for more details.s + 'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition + -- column value is null/empty string. + + -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly + -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled. + 'sink.shuffle-by-partition.enable' = '...', + ... +) +{% endhighlight %} +</div> +</div> + +<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/internals/filesystems.html). + +<span class="label label-danger">Attention</span> File system sources for streaming are only experimental. In the future, we will support actual streaming use cases, i.e., partition and directory monitoring. + +## Partition files + +The partition supported by the file system connector is similar to hive, but different from hive, +hive manage partitions through catalog, file system table manages partitions according to the +directory of the file system. File system connector discover and infer partitions automatically. +For example, a table partitioned by datetime and hour is the structure in file system path: + +``` +path +└── datetime=2019-08-25 + └── hour=11 + ├── part-0.parquet + ├── part-1.parquet + └── hour=12 + ├── part-0.parquet +└── datetime=2019-08-26 + └── hour=6 + ├── part-0.parquet +``` + +The file system table support partition inserting and overwrite inserting. See [INSERT Statement]({{ site.baseurl }}/table/sql/insert.html). + +**NOTE:** When you insert overwrite to a partitioned table, only the corresponding partition will be overwrite, not the entire table. + +## File Formats + +The file system connector supports multiple formats: + + - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed. + - JSON: Note JSON format for file system connector is not a typical JSON file. It is [Newline-delimited JSON](http://jsonlines.org/). Uncompressed. + - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`. + - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive. + - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive. + +## Streaming sink + +The file system connector supports streaming sink, it uses [Streaming File Sink]({{ site.baseurl }}/connectors/streamfile_sink.html) +to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro. + +### Rolling policy + +Data within the partition directories are split into part files. Each partition will contain at least one part file for +each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional +part file will be created according to the configurable rolling policy. The policy rolls part files based on size, +a timeout that specifies the maximum duration for which a file can be open. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.rolling-policy.file-size</h5></td> + <td style="word-wrap: break-word;">1024L * 1024L * 128L</td> + <td>Long</td> + <td>The maximum part file size before rolling.</td> + </tr> + <tr> + <td><h5>sink.rolling-policy.time-interval</h5></td> + <td style="word-wrap: break-word;">30 m</td> + <td>Duration</td> + <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).</td> + </tr> + </tbody> +</table> + +**NOTE:** For bulk formats (parquet,orc,avro), the rolling policy in combination with the checkpoint interval(pending files Review comment: In https://issues.apache.org/jira/browse/FLINK-11395 , because in avro file writer, metadata kept in the header and the writer supports compression. So we implement it as a bulk writer. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org