This example?:
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file'
);
-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
CREATE TABLE kafka_table - creates table, not file
CREATE TABLE fs_table - creates table, not file
INSERT INTO fs_table - inserts into table from another table
I do not see any create file section.
Anyway:Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath.Sent: Monday, July 11, 2022 at 10:31 AM
From: "Lijie Wang" <wangdachui9...@gmail.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Is it possible to save Table to CSV?You can use the FileSink and set the format to csv. An example of FileSink: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-exampleBest,Lijie<pod...@gmx.com> 于2022年7月11日周一 16:16写道:If I create dynamic table with:CREATE TABLE some_table (name STRING, score INT) WITH ( 'format' = 'csv', '...' );
//do some other stuff hereThen how to save table result to CSV file?Best,Mike