This example?:
 
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
    user_id,
    order_amount,
    DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
    DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

 
CREATE TABLE kafka_table - creates table, not file
CREATE TABLE fs_table - creates table, not file
INSERT INTO fs_table - inserts into table from another table
 
I do not see any create file section.
 
Anyway:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath.
 
 
Sent: Monday, July 11, 2022 at 10:31 AM
From: "Lijie Wang" <wangdachui9...@gmail.com>
To: pod...@gmx.com
Cc: user@flink.apache.org
Subject: Re: Is it possible to save Table to CSV?
You can use the FileSink and set the format to csv. An example of FileSink: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#full-example
 
Best,
Lijie
 
<pod...@gmx.com> 于2022年7月11日周一 16:16写道:
 
If I create dynamic table with:
 
CREATE TABLE some_table (name STRING, score INT)
WITH (
  'format' = 'csv',
  '...'
);
//do some other stuff here
 
Then how to save table result to CSV file?
 
Best,
 
Mike
 
 

Reply via email to