[ https://issues.apache.org/jira/browse/FLINK-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17128380#comment-17128380 ]
Chesnay Schepler edited comment on FLINK-18184 at 6/8/20, 3:22 PM: ------------------------------------------------------------------- [~dwysakowicz] Could we also output the available versions of each connector in the listing of considered factories (or more generally the required context I guess)? was (Author: zentol): [~dwysakowicz] Could we also output the available versions of each connector in the listing of considered factories? > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' > ------------------------------------------------------------------------------------------------- > > Key: FLINK-18184 > URL: https://issues.apache.org/jira/browse/FLINK-18184 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.9.1 > Environment: local:macos > flink1.9 > > Reporter: mzz > Priority: Major > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(5000) // checkpoint every 5000 msecs > //kafak配置 > val properties = new Properties() > properties.setProperty("bootstrap.servers", "172.16.30.207:9092") > properties.setProperty("group.id", "km_aggs_group") > val fsSettings = > EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() > val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new > SimpleStringSchema(), properties).setStartFromEarliest() > // val source = env.addSource(kafkaConsumer) > val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings) > streamTableEnvironment.connect(new Kafka() > .topic(TOPIC) > .version(VERSION) > .startFromEarliest() > .property("bootstrap.servers", "172.16.30.207:9092") > .property("zookeeper.connect", "172.16.30.207:2181") > .property("group.id", "km_aggs_group_table") > // .properties(properties) > ) > .withFormat( > new Json() > .failOnMissingField(true) > .deriveSchema() > ) > .withSchema(new Schema() > .field("advs", Types.STRING()) > .field("devs", Types.STRING()) > .field("environment", Types.STRING()) > .field("events", Types.STRING()) > .field("identity", Types.STRING()) > .field("ip", Types.STRING()) > .field("launchs", Types.STRING()) > .field("ts", Types.STRING()) > ) > .inAppendMode() > .registerTableSource("aggs_test") > val tableResult = streamTableEnvironment.sqlQuery("select * from > aggs_test") > tableResult.printSchema() > // streamTableEnvironment.toAppendStream[Row](tableResult).print() > //启动程序 > env.execute("test_kafka") > -------------------------------------------------------- > erroe message: > Exception in thread "main" org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) > at > KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70) > at > KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > Reason: No context matches. > The following properties are requested: > connector.properties.0.key=zookeeper.connect > connector.properties.0.value=172.16.30.207:2181 > connector.properties.1.key=group.id > connector.properties.1.value=km_aggs_group_table > connector.properties.2.key=bootstrap.servers > connector.properties.2.value=172.16.30.207:9092 > connector.property-version=1 > connector.startup-mode=earliest-offset > connector.topic=aggs_topic > connector.type=kafka > connector.version=2.0 > format.derive-schema=true > format.fail-on-missing-field=true > format.property-version=1 > format.type=json > schema.0.name=advs > schema.0.type=VARCHAR > schema.1.name=devs > schema.1.type=VARCHAR > schema.2.name=environment > schema.2.type=VARCHAR > schema.3.name=events > schema.3.type=VARCHAR > schema.4.name=identity > schema.4.type=VARCHAR > schema.5.name=ip > schema.5.type=VARCHAR > schema.6.name=launchs > schema.6.type=VARCHAR > schema.7.name=ts > schema.7.type=VARCHAR > update-mode=append > The following factories have been considered: > org.apache.flink.formats.json.JsonRowFormatFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.planner.StreamPlannerFactory > org.apache.flink.table.executor.StreamExecutorFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > org.apache.flink.table.catalog.GenericInMemoryCatalogFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) > ... 4 more > I've tried these way,Didn't solve my > problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto] > Anyone help me ,THX! -- This message was sent by Atlassian Jira (v8.3.4#803005)