[ 
https://issues.apache.org/jira/browse/FLINK-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17128372#comment-17128372
 ] 

Dawid Wysakowicz edited comment on FLINK-18184 at 6/8/20, 3:17 PM:
-------------------------------------------------------------------

I am quite sure this is because you use {{connector.version=2.0}}. The 
{{connector.version}} describes the version of the connector and not the 
version of Kafka. There is no connector with such version. Try using 
{{"universal"}}. Also see the documentation for Kafka connector 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector:
{code}
  'connector.version' = '0.11',     -- required: valid connector versions are
                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
{code}

Let me know if it helps so that I can close the ticket.

Side note, before opening a ticket like this it is usually worth reaching to 
the ML first, unless you are sure there is a bug in Flink. 


was (Author: dawidwys):
I am quite sure this is because you use {{connector.version=2.0}}. The 
{{connector.version}} describes the version of the connector and not the 
version of Kafka. There is no connector with such version. Try using 
{{"universal"}}. Also see the documentation for Kafka connector 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector:
{code}
  'connector.version' = '0.11',     -- required: valid connector versions are
                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
{code}

Side note, before opening a ticket like this it is usually worth reaching to 
the ML first, unless you are sure there is a bug in Flink. 

> Could not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory'
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18184
>                 URL: https://issues.apache.org/jira/browse/FLINK-18184
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>         Environment: local:macos
> flink1.9
>  
>            Reporter: mzz
>            Priority: Major
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.enableCheckpointing(5000) // checkpoint every 5000 msecs
>     //kafak配置
>     val properties = new Properties()
>     properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
>     properties.setProperty("group.id", "km_aggs_group")
>     val fsSettings = 
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
>     val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new 
> SimpleStringSchema(), properties).setStartFromEarliest()
>     //    val source = env.addSource(kafkaConsumer)
>     val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
>     streamTableEnvironment.connect(new Kafka()
>       .topic(TOPIC)
>       .version(VERSION)
>       .startFromEarliest()
>       .property("bootstrap.servers", "172.16.30.207:9092")
>       .property("zookeeper.connect", "172.16.30.207:2181")
>       .property("group.id", "km_aggs_group_table")
>       //      .properties(properties)
>     )
>       .withFormat(
>         new Json()
>           .failOnMissingField(true)
>           .deriveSchema()
>       )
>       .withSchema(new Schema()
>         .field("advs", Types.STRING())
>         .field("devs", Types.STRING())
>         .field("environment", Types.STRING())
>         .field("events", Types.STRING())
>         .field("identity", Types.STRING())
>         .field("ip", Types.STRING())
>         .field("launchs", Types.STRING())
>         .field("ts", Types.STRING())
>       )
>       .inAppendMode()
>       .registerTableSource("aggs_test")
>     val tableResult = streamTableEnvironment.sqlQuery("select * from 
> aggs_test")
>     tableResult.printSchema()
> //    streamTableEnvironment.toAppendStream[Row](tableResult).print()
>     //启动程序
>     env.execute("test_kafka")
> --------------------------------------------------------
> erroe message:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>       at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>       at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>       at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>       at 
> KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
>       at 
> KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=172.16.30.207:2181
> connector.properties.1.key=group.id
> connector.properties.1.value=km_aggs_group_table
> connector.properties.2.key=bootstrap.servers
> connector.properties.2.value=172.16.30.207:9092
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=aggs_topic
> connector.type=kafka
> connector.version=2.0
> format.derive-schema=true
> format.fail-on-missing-field=true
> format.property-version=1
> format.type=json
> schema.0.name=advs
> schema.0.type=VARCHAR
> schema.1.name=devs
> schema.1.type=VARCHAR
> schema.2.name=environment
> schema.2.type=VARCHAR
> schema.3.name=events
> schema.3.type=VARCHAR
> schema.4.name=identity
> schema.4.type=VARCHAR
> schema.5.name=ip
> schema.5.type=VARCHAR
> schema.6.name=launchs
> schema.6.type=VARCHAR
> schema.7.name=ts
> schema.7.type=VARCHAR
> update-mode=append
> The following factories have been considered:
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>       at 
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>       at 
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>       at 
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>       at 
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
>       at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
>       ... 4 more
> I've tried these way,Didn't solve my 
> problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto]
> Anyone help me ,THX!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to