[ 
https://issues.apache.org/jira/browse/FLINK-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17128403#comment-17128403
 ] 

Dawid Wysakowicz edited comment on FLINK-18184 at 6/8/20, 3:56 PM:
-------------------------------------------------------------------

Yes, that's what we are trying to do. In the old factories (pre 1.11) we had a 
problematic design where we had required properties that had to match first 
before we had any context. The {{connector.version}} was one of those required 
properties.

For the old factories we could print all required properties for all factories 
with all alternatives, but as we want to drop those interfaces anyway, I am not 
sure if it is worth reworking it now.

In the new design the only property that we get the context from is 
{{connector}} (formerly {{connector.type}}) which lets us print way more useful 
exception, with all possible alternatives.


was (Author: dawidwys):
Yes, that's what we are trying to do. In the old factories (pre 1.11) we had a 
problematic design where we had required properties that had to match first 
before we had any context. The {{connector.version}} was one of those required 
properties. 

In the new design the only property that we get the context from is 
{{connector}} (formerly {{connector.type}}) which lets us print way more useful 
exception, with all possible alternatives.

> Could not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory'
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18184
>                 URL: https://issues.apache.org/jira/browse/FLINK-18184
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>         Environment: local:macos
> flink1.9
>  
>            Reporter: mzz
>            Priority: Major
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.enableCheckpointing(5000) // checkpoint every 5000 msecs
>     //kafak配置
>     val properties = new Properties()
>     properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
>     properties.setProperty("group.id", "km_aggs_group")
>     val fsSettings = 
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
>     val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new 
> SimpleStringSchema(), properties).setStartFromEarliest()
>     //    val source = env.addSource(kafkaConsumer)
>     val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
>     streamTableEnvironment.connect(new Kafka()
>       .topic(TOPIC)
>       .version(VERSION)
>       .startFromEarliest()
>       .property("bootstrap.servers", "172.16.30.207:9092")
>       .property("zookeeper.connect", "172.16.30.207:2181")
>       .property("group.id", "km_aggs_group_table")
>       //      .properties(properties)
>     )
>       .withFormat(
>         new Json()
>           .failOnMissingField(true)
>           .deriveSchema()
>       )
>       .withSchema(new Schema()
>         .field("advs", Types.STRING())
>         .field("devs", Types.STRING())
>         .field("environment", Types.STRING())
>         .field("events", Types.STRING())
>         .field("identity", Types.STRING())
>         .field("ip", Types.STRING())
>         .field("launchs", Types.STRING())
>         .field("ts", Types.STRING())
>       )
>       .inAppendMode()
>       .registerTableSource("aggs_test")
>     val tableResult = streamTableEnvironment.sqlQuery("select * from 
> aggs_test")
>     tableResult.printSchema()
> //    streamTableEnvironment.toAppendStream[Row](tableResult).print()
>     //启动程序
>     env.execute("test_kafka")
> --------------------------------------------------------
> erroe message:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>       at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>       at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>       at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>       at 
> KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
>       at 
> KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=172.16.30.207:2181
> connector.properties.1.key=group.id
> connector.properties.1.value=km_aggs_group_table
> connector.properties.2.key=bootstrap.servers
> connector.properties.2.value=172.16.30.207:9092
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=aggs_topic
> connector.type=kafka
> connector.version=2.0
> format.derive-schema=true
> format.fail-on-missing-field=true
> format.property-version=1
> format.type=json
> schema.0.name=advs
> schema.0.type=VARCHAR
> schema.1.name=devs
> schema.1.type=VARCHAR
> schema.2.name=environment
> schema.2.type=VARCHAR
> schema.3.name=events
> schema.3.type=VARCHAR
> schema.4.name=identity
> schema.4.type=VARCHAR
> schema.5.name=ip
> schema.5.type=VARCHAR
> schema.6.name=launchs
> schema.6.type=VARCHAR
> schema.7.name=ts
> schema.7.type=VARCHAR
> update-mode=append
> The following factories have been considered:
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>       at 
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>       at 
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>       at 
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>       at 
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
>       at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
>       ... 4 more
> I've tried these way,Didn't solve my 
> problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto]
> Anyone help me ,THX!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to