format 和 schema 应该在同一层。
参考一下 flink-sql-client 测试里TableNumber1的配置文件: test-sql-client-defaults.yaml

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 4:09 PM 曾晓勇 <[email protected]> wrote:

> @[email protected]
>  
> 谢谢,格式问题后面我检查了也已经调整正确了,直接从flink官网下载最新的版本在启动的时候报错,具体报错如下,目前想调试下能否将生产的个别脚本直接换成FLINKSQL
> 而不走java编程。如果走程序调整的量很大。
>      FLINK 版本:flink-1.7.2-bin-hadoop28-scala_2.11
>      启动命令:/home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e
> /home/hadoop/flink_test/env.yaml
>
>
> [hadoop@server2 bin]$ /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
> Validating current environment...
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
>         at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
>         at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
>         at
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
>         ... 2 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
> the classpath.
>
>
> Reason:
> The matching factory
> 'org.apache.flink.table.sources.CsvAppendTableSourceFactory' doesn't
> support 'format.schema.#.type'.
>
>
> Supported properties of this factory are:
> connector.path
> connector.path
> format.comment-prefix
> format.field-delimiter
> format.fields.#.name
> format.fields.#.type
> format.ignore-first-line
> format.ignore-parse-errors
> format.line-delimiter
> format.quote-character
> schema.#.name
> schema.#.type
>
>
> The following properties are requested:
> connector.path=/home/hadoop/flink_test/input.csv
> connector.type=filesystem
> format.comment-prefix=#
> format.fields.0.name=MyField1
> format.fields.0.type=INT
> format.fields.1.name=MyField2
> format.fields.1.type=VARCHAR
> format.line-delimiter=\n
> format.schema.0.name=MyField1
> format.schema.0.type=INT
> format.schema.1.name=MyField2
> format.schema.1.type=VARCHAR
> format.type=csv
> update-mode=append
>
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
>
>         at
> org.apache.flink.table.factories.TableFactoryService$.filterBySupportedProperties(TableFactoryService.scala:277)
>         at
> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:136)
>         at
> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
>         at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
>         at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
>         ... 4 more
> [hadoop@server2 bin]$
>
>
>
>
>
>
>
>
>
>
>
> -- 调整后的格式问题。
> tables:
>  - name: MyTableSource
>    type: source-table
>    update-mode: append
>    connector:
>      type: filesystem
>      path: "/home/hadoop/flink_test/input.csv"
>    format:
>     type: csv
>     fields:
>         - name: MyField1
>           type: INT
>         - name: MyField2
>           type: VARCHAR
>     line-delimiter: "\n"
>     comment-prefix: "#"
>     schema:
>         - name: MyField1
>           type: INT
>         - name: MyField2
>           type: VARCHAR
>  - name: MyCustomView
>    type: view
>    query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming
> # required: execution mode either 'batch' or 'streaming'
>  result-mode: table
> # required: either 'table' or 'changelog'
>  max-table-result-rows: 1000000
> # optional: maximum number of maintained rows in table mode 1000000 by
> default, smaller 1 means unlimited
>  time-characteristic: event-time
> # optional: 'processing-time' or 'event-time' (default)
>  parallelism: 1
> # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200
> # optional: interval for periodic watermarks(200 ms by default)
>  max-parallelism: 16
> # optional: Flink's maximum parallelism (128by default)
>  min-idle-state-retention: 0
> # optional: table program's minimum idle state time
>  max-idle-state-retention: 0
> # optional: table program's maximum idle state time
>  restart-strategy:
> # optional: restart strategy
>    type: fallback
> # "fallback" to global restart strategy by default # Deployment properties
> allow for describing the cluster to which table programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "浪人"<[email protected]>;
> 发送时间: 2019年4月1日(星期一) 下午3:46
> 收件人: "user-zh"<[email protected]>;
>
> 主题: 回复: 【Flink SQL】无法启动env.yaml
>
>
>
> 格式没问题
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Zhenghua Gao"<[email protected]>;
> 发送时间: 2019年4月1日(星期一) 下午3:40
> 收件人: "user-zh"<[email protected]>;
>
> 主题: Re: 【Flink SQL】无法启动env.yaml
>
>
>
> yaml格式不对,看起来是缩进导致的。
> 你可以找个在线yaml编辑器验证一下, 比如 [1]
> 更多yaml格式的说明,参考 [2][3]
>
> [1] http://nodeca.github.io/js-yaml/
> [2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
> [3] https://en.wikipedia.org/wiki/YAML
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Apr 1, 2019 at 11:51 AM 曾晓勇 <[email protected]> wrote:
>
> > 各位好,
> >
> >        今天在测试Flink SQL 无法启动,错误日志如下。请问下配置yaml文件的格式需要注意下什么,分割符号能否支持特殊的符号如
> > hive建表语句中的分隔符'\036',详细报错日志如下。
> >
> > [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> > -e /home/hadoop/flink_test/env.yaml
> > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> > set.
> > No default environment specified.
> > Searching for
> > '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> > Reading default environment from:
> > file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> > Reading session environment from: file:/home/hadoop/flink_test/env.yaml
> >
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Could not parse
> > environment file. Cause: YAML decoding problem: while parsing a block
> > collection
> >  in 'reader', line 2, column 2:
> >      - name: MyTableSource
> >      ^
> > expected <block end>, but found BlockMappingStart
> >  in 'reader', line 17, column 3:
> >       schema:
> >       ^
> >  (through reference chain:
> > org.apache.flink.table.client.config.Environment["tables"])
> >         at
> >
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> >         at
> >
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> >         at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
> >
> >
> >
> >
> > --配置文件env.yaml
> > tables:
> >  - name: MyTableSource
> >    type: source-table
> >    update-mode: append
> >    connector:
> >      type: filesystem
> >      path: "/home/hadoop/flink_test/input.csv"
> >    format:
> >     type: csv
> >     fields:
> >         - name: MyField1
> >           type: INT
> >         - name: MyField2
> >           type: VARCHAR
> >     line-delimiter: "\n"
> >     comment-prefix: "#"
> >   schema:
> >         - name: MyField1
> >         type: INT
> >         - name: MyField2
> >         type: VARCHAR
> >  - name: MyCustomView
> >    type: view
> >    query: "SELECT MyField2 FROM MyTableSource"
> > # Execution properties allow for changing the behavior of a table
> program.
> > execution:
> >  type: streaming # required: execution mode either 'batch' or 'streaming'
> >  result-mode: table # required: either 'table' or 'changelog'
> >  max-table-result-rows: 1000000 # optional: maximum number of maintained
> > rows in
> >  # 'table' mode (1000000 by default, smaller 1 means unlimited)
> >  time-characteristic: event-time # optional: 'processing-time' or
> > 'event-time' (default)
> >  parallelism: 1 # optional: Flink's parallelism (1 by default)
> >  periodic-watermarks-interval: 200 # optional: interval for periodic
> > watermarks(200 ms by default)
> >  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> > default)
> >  min-idle-state-retention: 0 # optional: table program's minimum idle
> > state time
> >  max-idle-state-retention: 0 # optional: table program's maximum idle
> > state time
> >  restart-strategy: # optional: restart strategy
> >    type: fallback # "fallback" to global restart strategy by
> > default
> > # Deployment properties allow for describing the cluster to which table
> > programsare submitted to.
> > deployment:
> >   response-timeout: 5000
> >
> >

回复