Hi Jashua,

according to the property list, you passed "connector.version=0.10" so a Kafka 0.8 factory will not match.

Are you sure you are compiling the right thing? There seems to be a mismatch between your screenshot and the exception.

Regards,
Timo

Am 11.01.19 um 15:43 schrieb Joshua Fan:
Hi,

I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below.

        Exception in thread "main"
        org.apache.flink.table.api.NoMatchingTableFactoryException:
        Could not find a suitable table factory for
        'org.apache.flink.table.factories.StreamTableSourceFactory' in

        the classpath.


        Reason: No context matches.


        The following properties are requested:

        connector.properties.0.key=fetch.message.max.bytes

        connector.properties.0.value=10485760

        connector.properties.1.key=zookeeper.connect

        connector.properties.1.value=10.xxx.:2181/kafka

        connector.properties.2.key=group.id <http://group.id>

        connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21

        connector.properties.3.key=bootstrap.servers

        connector.properties.3.value=10.xxx:9092

        connector.property-version=1

        connector.startup-mode=latest-offset

        connector.topic=-flink-test

        connector.type=kafka

        connector.version=0.10

        format.derive-schema=true

        format.property-version=1

        format.type=json

        schema.0.name <http://schema.0.name>=rideId

        schema.0.type=VARCHAR

        schema.1.name <http://schema.1.name>=lon

        schema.1.type=VARCHAR


        The following factories have been considered:

        
org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory

        org.apache.flink.table.sources.CsvBatchTableSourceFactory

        org.apache.flink.table.sources.CsvAppendTableSourceFactory

        org.apache.flink.table.sinks.CsvBatchTableSinkFactory

        org.apache.flink.table.sinks.CsvAppendTableSinkFactory

        org.apache.flink.formats.json.JsonRowFormatFactory


        at
        
org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)

        at
        
org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)

        at
        
org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)

        at
        
org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)

        at
        
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)

        at TableSourceFinder.main(TableSourceFinder.java:40)


here is my code:
public static void main(String[] args)throws Exception{
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env); 
Kafka kafka =new Kafka(); Properties properties =new Properties();String zkString ="10.xxx:2181/kafka"; String brokerList ="10.xxx:9092"; 
properties.setProperty("fetch.message.max.bytes", "10485760"); properties.setProperty("group.id <http://group.id>", 
UUID.randomUUID().toString()); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("bootstrap.servers", brokerList); 
kafka.version("0.8").topic("flink-test").properties(properties); kafka.startFromLatest(); stEnv.connect(kafka).withSchema(
                 new Schema()
                         .field("rideId", Types.STRING())
                         .field("lon", Types.STRING()))
                 .withFormat(new Json().deriveSchema())

                 .registerTableSource("test"); Table table = stEnv.sqlQuery("select 
rideId from test"); DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) 
stEnv).
                 toAppendStream(table,Types.STRING()); ds.print(); 
env.execute("KafkaSql");

}

And here is my pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> 
</dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> 
<version>${flink.version}</version>
</dependency>
<dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version> 
</dependency>

In my opinion, I have all the lib in pom, don't know why it would fail in test locally.

Thank you for any hints.

Yours
Joshua


Reply via email to