the kafka connector jar is missing in your class path

*Best Regards,*
*Zhenghua Gao*

On Mon, Dec 2, 2019 at 2:14 PM srikanth flink <flink.d...@gmail.com> wrote:

> Hi there,
>
> I'm following the link
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html>
> to read JSON data from Kafka and convert to table, programmatically. I'd
> try and succeed declarative using SQL client.
>
> My Json data is nested like: {a:1,b,2,c:{x:1,y:2}}.
> Code:
>
>> String schema = "{type: 'object', properties: {'message': {type:
>> 'string'},'@timestamp': {type: 'string'}}}";
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().getCheckpointTimeout();
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>
>> tableEnv.connect(new
>> Kafka().version("universal").topic("recon-data").startFromEarliest()
>> .property("zookeeper.connect", "localhost:2181")
>> .property("bootstrap.servers", "localhost:9092"))
>> .withFormat(new
>> Json().failOnMissingField(false).jsonSchema(schema).deriveSchema())
>> .withSchema(new Schema().field("message",
>> Types.STRING()).field("@timestamp", Types.LOCAL_DATE_TIME()))
>> .inAppendMode().registerTableSource("reconTableS");
>>
>> Table t = tableEnv.sqlQuery("select * from reconTableS");
>> DataStream<Row> out = tableEnv.toAppendStream(t, Row.class);
>> out.print();
>>
>> try {
>> env.execute("Flink Example Json");
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>> }
>>
>
> pom.xml:
>
>> <properties>
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>> <flink.version>1.9.0</flink.version>
>> <java.version>1.8</java.version>
>> <scala.binary.version>2.11</scala.binary.version>
>> <maven.compiler.source>${java.version}</maven.compiler.source>
>> <maven.compiler.target>${java.version}</maven.compiler.target>
>> </properties>
>>
> <dependencies>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-scala_2.11</artifactId>
>> <version>${flink.version}</version>
>> <!-- <scope>provided</scope> -->
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-common</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-planner_2.11</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-java</artifactId>
>> <version>${flink.version}</version>
>> <!-- <scope>provided</scope> -->
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <!-- <scope>provided</scope> -->
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-kafka_2.12</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-json</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-cep_2.11</artifactId>
>> <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>> <groupId>mysql</groupId>
>> <artifactId>mysql-connector-java</artifactId>
>> <version>5.1.39</version>
>> </dependency>
>> </dependencies>
>>
>
> The code threw the following error:
>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>         at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>         at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>         at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>         at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>         at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>         at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>         at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>         at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>>         at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>>         at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>>         at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>>         at
>> kafka.flink.stream.list.match.ExampleJsonParser.main(ExampleJsonParser.java:31)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>         ... 12 more
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>>
>> Reason: No context matches.
>> The following properties are requested:
>> connector.properties.0.key=zookeeper.connect
>> connector.properties.0.value=localhost:2181
>> connector.properties.1.key=bootstrap.servers
>> connector.properties.1.value=localhost:9092
>> connector.property-version=1
>> connector.startup-mode=earliest-offset
>> connector.topic=recon-data
>> connector.type=kafka
>> connector.version=universal
>> format.derive-schema=true
>> format.fail-on-missing-field=false
>> format.property-version=1
>> format.type=json
>> schema.0.name=message
>> schema.0.type=VARCHAR
>> schema.1.name=@timestamp
>> schema.1.type=TIMESTAMP
>> update-mode=append
>>
>> The following factories have been considered:
>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
>> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>> org.apache.flink.table.planner.StreamPlannerFactory
>> org.apache.flink.table.executor.StreamExecutorFactory
>>         at
>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
>>         at
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
>>         at
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>>         at
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
>>         at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
>>         ... 20 more
>>
>
> Help me understand what am I missing
>

Reply via email to