lirui-apache commented on a change in pull request #12537: URL: https://github.com/apache/flink/pull/12537#discussion_r438591164
########## File path: docs/dev/table/hive/hive_streaming.md ########## @@ -0,0 +1,164 @@ +--- +title: "Hive Streaming" +nav-parent_id: hive_tableapi +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +A typical hive job is scheduled periodically to execute, so there will be a large delay. + +Flink supports to write, read and join the hive table in the form of streaming. + +* This will be replaced by the TOC +{:toc} + +There are three types of streaming: + +- Writing streaming data into Hive table. +- Reading Hive table incrementally in the form of streaming. +- Streaming table join Hive table using [Temporal Table]({{ site.baseurl }}/dev/table/streaming/temporal_tables.html#temporal-table). + +## Streaming Writing + +The Hive table supports streaming writes, based on [Filesystem Streaming Sink]({{ site.baseurl }}/dev/table/connectors/filesystem.html#streaming-sink). + +The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing. +Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint. + +By default, now only have renaming committer, this means S3 filesystem can not supports exactly-once, +if you want to use Hive streaming sink in S3 filesystem, You can configure the following parameters +in `TableConfig` (note that these parameters affect all sinks of the job): + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>table.exec.hive.fallback-mapred-writer</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>If it is false, using flink native writer to write parquet and orc files; if it is true, using hadoop mapred record writer to write parquet and orc files.</td> + </tr> + </tbody> +</table> + +The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit, +and runs a batch query to read that data back out. + +{% highlight sql %} + +SET table.sql-dialect=hive; +CREATE TABLE hive_table ( + user_id STRING, + order_amount DOUBLE +) PARTITIONED BY (dt STRING, hour STRING) STORED AS parquet TBLPROPERTIES ( + 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', + 'sink.partition-commit.trigger'='partition-time', + 'sink.partition-commit.delay'='1 h', + 'sink.partition-commit.policy.kind'='metastore,success-file' +); + +SET table.sql-dialect=default; +CREATE TABLE kafka_table ( + user_id STRING, + order_amount DOUBLE, + log_ts TIMESTAMP(3), + WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND +) WITH (...); + +-- streaming sql, insert into hive table +INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; + +-- batch sql, select with partition pruning +SELECT * FROM hive_table WHERE dt='2020-05-20' and hour='12'; + +{% endhighlight %} + +## Streaming Reading + +To improve the real-time performance of hive reading, Flink support real-time Hive table stream read: + +- Partition table, monitor the generation of partition, and read the new partition incrementally. +- Non-partition table, monitor the generation of new files in the folder, and read new files incrementally. + +You can even use the 10 minute level partition strategy, and use Flink's Hive streaming reading and +Hive streaming writing to greatly improve the real-time performance of Hive data warehouse to quasi +real-time minute level. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>streaming-source.enable</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enable streaming source or not. NOTES: For non-partition table, please make sure that each file should be put atomically into the target directory, otherwise the reader may get incomplete data.</td> + </tr> + <tr> + <td><h5>streaming-source.monitor-interval</h5></td> + <td style="word-wrap: break-word;">1 m</td> + <td>Duration</td> + <td>Time interval for consecutively monitoring partition/file.</td> + </tr> + <tr> + <td><h5>streaming-source.consume-order</h5></td> + <td style="word-wrap: break-word;">create-time</td> + <td>String</td> + <td>The consume order of streaming source, support create-time and partition-time. create-time compare partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file create time in filesystem; partition-time compare time represented by partition name. For non-partition table, this value should always be 'create-time'.</td> Review comment: I see. If we change default value to partition-time and user is using a non-partitioned table then the query will fail, right? If that's the case, then maybe we can go with create-time as default, but with proper warnings. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org