Hi Jashua,
according to the property list, you passed "connector.version=0.10" so a
Kafka 0.8 factory will not match.
Are you sure you are compiling the right thing? There seems to be a
mismatch between your screenshot and the exception.
Regards,
Timo
Am 11.01.19 um 15:43 schrieb Joshua Fan:
Hi,
I want to test flink sql locally by consuming kafka data in flink 1.7,
but it turns out an exception like below.
Exception in thread "main"
org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=fetch.message.max.bytes
connector.properties.0.value=10485760
connector.properties.1.key=zookeeper.connect
connector.properties.1.value=10.xxx.:2181/kafka
connector.properties.2.key=group.id <http://group.id>
connector.properties.2.value=d4b53966-796e-4a2d-b6eb-a5c489db2b21
connector.properties.3.key=bootstrap.servers
connector.properties.3.value=10.xxx:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=-flink-test
connector.type=kafka
connector.version=0.10
format.derive-schema=true
format.property-version=1
format.type=json
schema.0.name <http://schema.0.name>=rideId
schema.0.type=VARCHAR
schema.1.name <http://schema.1.name>=lon
schema.1.type=VARCHAR
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
at
org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at
org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at
org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at
org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at TableSourceFinder.main(TableSourceFinder.java:40)
here is my code:
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment stEnv = TableEnvironment.getTableEnvironment(env);
Kafka kafka =new Kafka(); Properties properties =new Properties();String zkString ="10.xxx:2181/kafka"; String brokerList ="10.xxx:9092";
properties.setProperty("fetch.message.max.bytes", "10485760"); properties.setProperty("group.id <http://group.id>",
UUID.randomUUID().toString()); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("bootstrap.servers", brokerList);
kafka.version("0.8").topic("flink-test").properties(properties); kafka.startFromLatest(); stEnv.connect(kafka).withSchema(
new Schema()
.field("rideId", Types.STRING())
.field("lon", Types.STRING()))
.withFormat(new Json().deriveSchema())
.registerTableSource("test"); Table table = stEnv.sqlQuery("select
rideId from test"); DataStream ds = ((org.apache.flink.table.api.java.StreamTableEnvironment)
stEnv).
toAppendStream(table,Types.STRING()); ds.print();
env.execute("KafkaSql");
}
And here is my pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version>
</dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId> <version>${flink.version}</version>
</dependency>
In my opinion, I have all the lib in pom, don't know why it would fail
in test locally.
Thank you for any hints.
Yours
Joshua