Hi,

I want to test flink sql locally by consuming kafka data in flink 1.7, but
it turns out an exception like below.

Exception in thread "main"
>> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find
>> a suitable table factory for
>> 'org.apache.flink.table.factories.StreamTableSourceFactory' in
>
> the classpath.
>
>
>> Reason: No context matches.
>
>
>> The following properties are requested:
>
> connector.properties.0.key=fetch.message.max.bytes
>
> connector.properties.0.value=10485760
>
> connector.properties.1.key=zookeeper.connect
>
> connector.properties.1.value=10.xxx.:2181/kafka
>
> connector.properties.2.key=group.id
>
> connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
>
> connector.properties.3.key=bootstrap.servers
>
> connector.properties.3.value=10.xxx:9092
>
> connector.property-version=1
>
> connector.startup-mode=latest-offset
>
> connector.topic=-flink-test
>
> connector.type=kafka
>
> connector.version=0.10
>
> format.derive-schema=true
>
> format.property-version=1
>
> format.type=json
>
> schema.0.name=rideId
>
> schema.0.type=VARCHAR
>
> schema.1.name=lon
>
> schema.1.type=VARCHAR
>
>
>> The following factories have been considered:
>
> org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
>
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>
> org.apache.flink.formats.json.JsonRowFormatFactory
>
>
>> at
>> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
>
> at
>> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
>
> at
>> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
>
> at
>> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
>
> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
>
> at TableSourceFinder.main(TableSourceFinder.java:40)
>
>
here is my code:

public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment stEnv =
TableEnvironment.getTableEnvironment(env);
        Kafka kafka = new Kafka();
        Properties properties = new Properties();
        String zkString = "10.xxx:2181/kafka";
        String brokerList = "10.xxx:9092";

        properties.setProperty("fetch.message.max.bytes", "10485760");
        properties.setProperty("group.id", UUID.randomUUID().toString());
        properties.setProperty("zookeeper.connect", zkString);
        properties.setProperty("bootstrap.servers", brokerList);
        kafka.version("0.8").topic("flink-test").properties(properties);
        kafka.startFromLatest();
        stEnv.connect(kafka).withSchema(
                new Schema()
                        .field("rideId", Types.STRING())
                        .field("lon", Types.STRING()))
                .withFormat(new Json().deriveSchema())

                .registerTableSource("test");

        Table table = stEnv.sqlQuery("select rideId from test");
        DataStream ds =
((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv).
                toAppendStream(table,Types.STRING());
        ds.print();
        env.execute("KafkaSql");

}


And here is my pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>

</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>


In my opinion, I have all the lib in pom, don't know why it would fail in
test locally.

Thank you for any hints.

Yours
Joshua

Reply via email to