Hi, I want to test flink sql locally by consuming kafka data in flink 1.7, but it turns out an exception like below.
Exception in thread "main" >> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find >> a suitable table factory for >> 'org.apache.flink.table.factories.StreamTableSourceFactory' in > > the classpath. > > >> Reason: No context matches. > > >> The following properties are requested: > > connector.properties.0.key=fetch.message.max.bytes > > connector.properties.0.value=10485760 > > connector.properties.1.key=zookeeper.connect > > connector.properties.1.value=10.xxx.:2181/kafka > > connector.properties.2.key=group.id > > connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21 > > connector.properties.3.key=bootstrap.servers > > connector.properties.3.value=10.xxx:9092 > > connector.property-version=1 > > connector.startup-mode=latest-offset > > connector.topic=-flink-test > > connector.type=kafka > > connector.version=0.10 > > format.derive-schema=true > > format.property-version=1 > > format.type=json > > schema.0.name=rideId > > schema.0.type=VARCHAR > > schema.1.name=lon > > schema.1.type=VARCHAR > > >> The following factories have been considered: > > org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory > > org.apache.flink.table.sources.CsvBatchTableSourceFactory > > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > org.apache.flink.table.sinks.CsvBatchTableSinkFactory > > org.apache.flink.table.sinks.CsvAppendTableSinkFactory > > org.apache.flink.formats.json.JsonRowFormatFactory > > >> at >> org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214) > > at >> org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130) > > at >> org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81) > > at >> org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49) > > at >> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46) > > at TableSourceFinder.main(TableSourceFinder.java:40) > > here is my code: public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka(); Properties properties = new Properties(); String zkString = "10.xxx:2181/kafka"; String brokerList = "10.xxx:9092"; properties.setProperty("fetch.message.max.bytes", "10485760"); properties.setProperty("group.id", UUID.randomUUID().toString()); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("bootstrap.servers", brokerList); kafka.version("0.8").topic("flink-test").properties(properties); kafka.startFromLatest(); stEnv.connect(kafka).withSchema( new Schema() .field("rideId", Types.STRING()) .field("lon", Types.STRING())) .withFormat(new Json().deriveSchema()) .registerTableSource("test"); Table table = stEnv.sqlQuery("select rideId from test"); DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment) stEnv). toAppendStream(table,Types.STRING()); ds.print(); env.execute("KafkaSql"); } And here is my pom.xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version> </dependency> In my opinion, I have all the lib in pom, don't know why it would fail in test locally. Thank you for any hints. Yours Joshua