[ https://issues.apache.org/jira/browse/FLINK-12531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937529#comment-16937529 ]
Pritam Sadhukhan commented on FLINK-12531: ------------------------------------------ I am facing the same issue with version 1.7.1 too. > flink sql-client throw NoMatchingTableFactoryException > ------------------------------------------------------ > > Key: FLINK-12531 > URL: https://issues.apache.org/jira/browse/FLINK-12531 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client > Affects Versions: 1.7.2 > Reporter: leishuiyu > Priority: Major > Attachments: image-2019-05-16-17-05-06-940.png, > image-2019-05-16-17-07-14-883.png > > > 1.bin/sql-client.sh embedded -e conf/sql-client-cp.yaml > 2.sql-client-cp.yaml > {code:java} > //代码占位符 > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > ################################################################################ > # This file defines the default environment for Flink's SQL Client. > # Defaults might be overwritten by a session specific environment. > # See the Table API & SQL documentation for details about supported > properties. > #============================================================================== > # Tables > #============================================================================== > # Define tables here such as sources, sinks, views, or temporal tables. > tables: [] # empty list > # A typical table source definition looks like: > # - name: ... > # type: source-table > # connector: ... > # format: ... > # schema: ... > # A typical view definition looks like: > # - name: ... > # type: view > # query: "SELECT ..." > # A typical temporal table definition looks like: > # - name: ... > # type: temporal-table > # history-table: ... > # time-attribute: ... > # primary-key: ... > #============================================================================== > # User-defined functions > #============================================================================== > tables: > - name: MyUserTable # name the new table > type: source # declare if the table should be "source", "sink", or "both" > update-mode: append # specify the update-mode for streaming tables > # declare the external system to connect to > connector: > type: kafka > version: "0.11" > topic: test-input > startup-mode: earliest-offset > properties: > - key: zookeeper.connect > value: centos-6:2181 > - key: bootstrap.servers > value: centos-6:9092 > # declare a format for this system > format: > type: avro > avro-schema: > > { > "namespace": "org.myorganization", > "type": "record", > "name": "UserMessage", > "fields": [ > {"name": "ts", "type": "string"}, > {"name": "user", "type": "long"}, > {"name": "message", "type": ["string", "null"]} > ] > } > # declare the schema of the table > schema: > - name: rowtime > type: TIMESTAMP > rowtime: > timestamps: > type: from-field > from: ts > watermarks: > type: periodic-bounded > delay: "60000" > - name: user > type: BIGINT > - name: message > type: VARCHAR > # Define scalar, aggregate, or table functions here. > functions: [] # empty list > # A typical function definition looks like: > # - name: ... > # from: class > # class: ... > # constructor: ... > #============================================================================== > # Execution properties > #============================================================================== > # Execution properties allow for changing the behavior of a table program. > execution: > # 'batch' or 'streaming' execution > type: streaming > # allow 'event-time' or only 'processing-time' in sources > time-characteristic: event-time > # interval in ms for emitting periodic watermarks > periodic-watermarks-interval: 200 > # 'changelog' or 'table' presentation of results > result-mode: table > # maximum number of maintained rows in 'table' presentation of results > max-table-result-rows: 1000000 > # parallelism of the program > parallelism: 1 > # maximum parallelism > max-parallelism: 128 > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > # controls how table programs are restarted in case of a failures > restart-strategy: > # strategy type > # possible values are "fixed-delay", "failure-rate", "none", or "fallback" > (default) > type: fallback > #============================================================================== > # Deployment properties > #============================================================================== > # Deployment properties allow for describing the cluster to which table > # programs are submitted to. > deployment: > # general cluster communication timeout in ms > response-timeout: 5000 > # (optional) address from cluster to gateway > gateway-address: "" > # (optional) port from cluster to gateway > gateway-port: 0 > {code} > 3.log show > * //代码占位符No default environment specified. Searching for > '/opt/soft/flink-1.7.1/conf/sql-client-defaults.yaml'...found. Reading > default environment from: > file:/opt/soft/flink-1.7.1/conf/sql-client-defaults.yaml Reading session > environment from: file:/opt/soft/flink-1.7.1/conf/sql-client-cp.yaml > Validating current environment... Exception in thread "main" > org.apache.flink.table.client.SqlClientException: The configured environment > is invalid. Please check your environment files again. at > org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99) at > org.apache.flink.table.client.SqlClient.main(SqlClient.java:187) Caused by: > org.apache.flink.table.client.gateway.SqlExecutionException: Could not create > execution context. at > org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316) > at > org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137) > ... 2 more Caused by: > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a > suitable table factory for > 'org.apache.flink.table.factories.StreamTableSourceFactory' in the classpath. > Reason: No context matches. The following properties are requested: > connector.properties.0.key=zookeeper.connect > connector.properties.0.value=centos-6:2181 > connector.properties.1.key=bootstrap.servers > connector.properties.1.value=centos-6:9092 > connector.startup-mode=earliest-offset connector.topic=test-input > connector.type=kafka connector.version=0.11 format.avro-schema={\n > \"namespace\": \"org.myorganization\",\n \"type\": \"record\",\n \"name\": > \"UserMessage\",\n \"fields\": [\n {\"name\": \"ts\", \"type\": > \"string\"},\n {\"name\": \"user\", \"type\": \"long\"},\n {\"name\": > \"message\", \"type\": [\"string\", \"null\"]}\n ]\n}\n format.type=avro > schema.0.name=rowtime schema.0.rowtime.timestamps.from=ts > schema.0.rowtime.timestamps.type=from-field > schema.0.rowtime.watermarks.delay=60000 > schema.0.rowtime.watermarks.type=periodic-bounded schema.0.type=TIMESTAMP > schema.1.name=user schema.1.type=BIGINT schema.2.name=message > schema.2.type=VARCHAR update-mode=append The following factories have been > considered: org.apache.flink.formats.avro.AvroRowFormatFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.formats.json.JsonRowFormatFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory at > org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) > at > org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) > at > org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484) > ... 4 more > 4.the kafka jar flink-connector-kafka_2.11-1.7.2-sql-jar.jar in > /opt/soft/flink-1.7.1/lib and /opt/soft/flink-1.7.1/opt > 5 the avro jar flink-avro-1.6.1-sql-jar.jar /opt/soft/flink-1.7.1/lib and > /opt/soft/flink-1.7.1/opt > -- This message was sent by Atlassian Jira (v8.3.4#803005)