leishuiyu created FLINK-12531: --------------------------------- Summary: flink sql-client throw NoMatchingTableFactoryException Key: FLINK-12531 URL: https://issues.apache.org/jira/browse/FLINK-12531 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.7.2 Reporter: leishuiyu
1.bin/sql-client.sh embedded -e conf/sql-client-cp.yaml 2.sql-client-cp.yaml {code:java} //代码占位符 # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # This file defines the default environment for Flink's SQL Client. # Defaults might be overwritten by a session specific environment. # See the Table API & SQL documentation for details about supported properties. #============================================================================== # Tables #============================================================================== # Define tables here such as sources, sinks, views, or temporal tables. tables: [] # empty list # A typical table source definition looks like: # - name: ... # type: source-table # connector: ... # format: ... # schema: ... # A typical view definition looks like: # - name: ... # type: view # query: "SELECT ..." # A typical temporal table definition looks like: # - name: ... # type: temporal-table # history-table: ... # time-attribute: ... # primary-key: ... #============================================================================== # User-defined functions #============================================================================== tables: - name: MyUserTable # name the new table type: source # declare if the table should be "source", "sink", or "both" update-mode: append # specify the update-mode for streaming tables # declare the external system to connect to connector: type: kafka version: "0.11" topic: test-input startup-mode: earliest-offset properties: - key: zookeeper.connect value: centos-6:2181 - key: bootstrap.servers value: centos-6:9092 # declare a format for this system format: type: avro avro-schema: > { "namespace": "org.myorganization", "type": "record", "name": "UserMessage", "fields": [ {"name": "ts", "type": "string"}, {"name": "user", "type": "long"}, {"name": "message", "type": ["string", "null"]} ] } # declare the schema of the table schema: - name: rowtime type: TIMESTAMP rowtime: timestamps: type: from-field from: ts watermarks: type: periodic-bounded delay: "60000" - name: user type: BIGINT - name: message type: VARCHAR # Define scalar, aggregate, or table functions here. functions: [] # empty list # A typical function definition looks like: # - name: ... # from: class # class: ... # constructor: ... #============================================================================== # Execution properties #============================================================================== # Execution properties allow for changing the behavior of a table program. execution: # 'batch' or 'streaming' execution type: streaming # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time # interval in ms for emitting periodic watermarks periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: table # maximum number of maintained rows in 'table' presentation of results max-table-result-rows: 1000000 # parallelism of the program parallelism: 1 # maximum parallelism max-parallelism: 128 # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 # controls how table programs are restarted in case of a failures restart-strategy: # strategy type # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) type: fallback #============================================================================== # Deployment properties #============================================================================== # Deployment properties allow for describing the cluster to which table # programs are submitted to. deployment: # general cluster communication timeout in ms response-timeout: 5000 # (optional) address from cluster to gateway gateway-address: "" # (optional) port from cluster to gateway gateway-port: 0 {code} 3.log show {code:java} //代码占位符No default environment specified. Searching for '/opt/soft/flink-1.7.1/conf/sql-client-defaults.yaml'...found. Reading default environment from: file:/opt/soft/flink-1.7.1/conf/sql-client-defaults.yaml Reading session environment from: file:/opt/soft/flink-1.7.1/conf/sql-client-cp.yaml Validating current environment... Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again. at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488) at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316) at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137) ... 2 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=centos-6:2181 connector.properties.1.key=bootstrap.servers connector.properties.1.value=centos-6:9092 connector.startup-mode=earliest-offset connector.topic=test-input connector.type=kafka connector.version=0.11 format.avro-schema={\n \"namespace\": \"org.myorganization\",\n \"type\": \"record\",\n \"name\": \"UserMessage\",\n \"fields\": [\n {\"name\": \"ts\", \"type\": \"string\"},\n {\"name\": \"user\", \"type\": \"long\"},\n {\"name\": \"message\", \"type\": [\"string\", \"null\"]}\n ]\n}\n format.type=avro schema.0.name=rowtime schema.0.rowtime.timestamps.from=ts schema.0.rowtime.timestamps.type=from-field schema.0.rowtime.watermarks.delay=60000 schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP schema.1.name=user schema.1.type=BIGINT schema.2.name=message schema.2.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.formats.avro.AvroRowFormatFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119) at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484) ... 4 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)