Hi,

Please verify that:
1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf
your-program.jar | grep KafkaDynamicTableFactory")
2. kafka-connector version matches the version of Flink distribution on EMR.

Regards,
Roman


On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Hi,
>
> I could not launch my flink 1.11.2 application on EMR with exception
>
> Caused by: org.apache.flink.table.api.ValidationException:
> Could not find any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
>
> I attached the full log at the end. After checking some other threads and
> none applies in my case. here is my observation:
>
> 1. dependency check: both flink-connector-kafka and flink-json are
> included in the final fat jar.
> 2.
> resources/META-INF/services/org.apache.flink.table.factories.TableFactory
> has the following and is included in the final fat jar.
>   - org.apache.flink.formats.json.JsonRowFormatFactory
>   - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>   also noticed that only identifier datagen is shown in the log. No
> kafka or json in there.
> 3. local IntelliJ running fine.
> 4. same jar on EMR not working
>
> Please advise.
> Thanks,
> Fanbin
>
>
>
>
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a source for reading table
> 'default_catalog.default_database.analytics_service'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'json.ignore-parse-errors'='true'
> 'properties.bootstrap.servers'='localhost:9093'
> 'properties.group.id'='xxx'
> 'properties.security.protocol'='SSL'
> 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1'
> 'properties.ssl.key.password'='secret'
> 'properties.ssl.keystore.location'='xxx.jks'
> 'properties.ssl.keystore.password'='secret'
> 'properties.ssl.keystore.type'='JKS'
> 'properties.ssl.truststore.location'='xxx.jks'
> 'properties.ssl.truststore.password'='secret'
> 'properties.ssl.truststore.type'='JKS'
> 'properties.zookeeper.connect'='localhost:2181'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='events'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664)
> at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133)
> at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36)
> at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30)
> at
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76)
> at
> com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a connector using option ''connector'='kafka''.
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> ... 43 more
>
> *Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.*
> Available factory identifiers are:
>
> *datagen*
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 44 more
>
>
>
>

Reply via email to