Hi Günter, Now File system connector only has OldCsv format [1]. But your yaml has new csv properties like "format.allow-comments".
You can check Old csv supported properties. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#old-csv-format Best, Jingsong Lee On Mon, Feb 3, 2020 at 1:00 PM Günter Hipler <[email protected]> wrote: > Hi, > > I can't read from a file source using the sql-client tool. > > I just set up a simple test scenario with the configuration file in [1] > > I'm getting the error in [2] starting the environment with > bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib > in the standard Flink 1.9.1 download environment. > > Reading the documentation > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors > I expected the file system connectors as "built in" > > I'm getting a similar issue while following the sql-training tutorial > from https://github.com/ververica/sql-training/wiki. There I changed the > used sql-client-conf.yaml file of the docker container for the sql client. > > Thanks for any hints > > Günter > > > > > [1] > > > ################################################################################ > # Define table sources here. See the Table API & SQL documentation for > details. > > tables: > > - name: Guenter > type: source-table > update-mode: append > connector: > type: filesystem > path: file:///home/swissbib/temp/trash/hello.txt > > > format: > type: csv > # required: define the schema either by using type information > schema: "ROW(test STRING)" > > # or use the table's schema > derive-schema: true > > field-delimiter: ";" # optional: field delimiter > character (',' by default) > line-delimiter: "\r\n" # optional: line delimiter ("\n" by > default; otherwise "\r" or "\r\n" are allowed) > quote-character: "'" # optional: quote character for > enclosing field values ('"' by default) > allow-comments: true # optional: ignores comment lines > that start with "#" (disabled by default) > # if enabled, make sure to also ignore parse errors to allow > empty rows > ignore-parse-errors: true # optional: skip fields and rows > with parse errors instead of failing; > # fields are set to null in case of errors > array-element-delimiter: "|" # optional: the array element > delimiter string for separating > # array and row element values (";" by default) > escape-character: "\\" # optional: escape character for > escaping values (disabled by default) > null-literal: "n/a" # optional: null literal string that > is interpreted as a > # null value (disabled by default) > > > > > #============================================================================== > # Execution properties > > #============================================================================== > > # Execution properties allow for changing the behavior of a table program. > > execution: > #planner: blink > type: streaming # 'batch' or 'streaming' execution > result-mode: table # 'changelog' or 'table' presentation of > results > parallelism: 1 # parallelism of the program > max-parallelism: 128 # maximum parallelism > min-idle-state-retention: 0 # minimum idle state retention in ms > max-idle-state-retention: 0 # maximum idle state retention in ms > > > #============================================================================== > # Deployment properties > > #============================================================================== > > # Deployment properties allow for describing the cluster to which table > # programs are submitted to. > > deployment: > type: standalone # only the 'standalone' deployment is > supported > response-timeout: 5000 # general cluster communication timeout > in ms > gateway-address: "" # (optional) address from cluster to > gateway > gateway-port: 0 # (optional) port from cluster to gateway > > [2] > > bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib > Reading default environment from: > file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml > No session environment specified. > Validating current environment... > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: The configured > environment is invalid. Please check your environment files again. > at > > org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562) > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382) > at > > org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144) > ... 2 more > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: No factory supports all properties. > > The following properties are requested: > connector.path=file:///home/swissbib/temp/trash/hello.txt > connector.type=filesystem > format.allow-comments=true > format.array-element-delimiter=| > format.derive-schema=true > format.escape-character=\\ > format.field-delimiter=; > format.ignore-parse-errors=true > format.line-delimiter=\r\n > format.null-literal=n/a > format.quote-character=' > format.schema=ROW(test STRING) > format.type=csv > update-mode=append > > The following factories have been considered: > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > org.apache.flink.table.planner.StreamPlannerFactory > org.apache.flink.table.executor.StreamExecutorFactory > org.apache.flink.table.planner.delegation.BlinkPlannerFactory > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > > org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370) > at > > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197) > at > > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142) > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558) > ... 4 more > > > > -- Best, Jingsong Lee
