Hi, Please verify that: 1. kafka-connector is indeed in the fat jar (e.g. by "jar vtf your-program.jar | grep KafkaDynamicTableFactory") 2. kafka-connector version matches the version of Flink distribution on EMR.
Regards, Roman On Tue, Nov 17, 2020 at 6:47 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > Hi, > > I could not launch my flink 1.11.2 application on EMR with exception > > Caused by: org.apache.flink.table.api.ValidationException: > Could not find any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > > I attached the full log at the end. After checking some other threads and > none applies in my case. here is my observation: > > 1. dependency check: both flink-connector-kafka and flink-json are > included in the final fat jar. > 2. > resources/META-INF/services/org.apache.flink.table.factories.TableFactory > has the following and is included in the final fat jar. > - org.apache.flink.formats.json.JsonRowFormatFactory > - org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > also noticed that only identifier datagen is shown in the log. No > kafka or json in there. > 3. local IntelliJ running fine. > 4. same jar on EMR not working > > Please advise. > Thanks, > Fanbin > > > > > Caused by: org.apache.flink.table.api.ValidationException: Unable to > create a source for reading table > 'default_catalog.default_database.analytics_service'. > > Table options are: > > 'connector'='kafka' > 'format'='json' > 'json.ignore-parse-errors'='true' > 'properties.bootstrap.servers'='localhost:9093' > 'properties.group.id'='xxx' > 'properties.security.protocol'='SSL' > 'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1' > 'properties.ssl.key.password'='secret' > 'properties.ssl.keystore.location'='xxx.jks' > 'properties.ssl.keystore.password'='secret' > 'properties.ssl.keystore.type'='JKS' > 'properties.ssl.truststore.location'='xxx.jks' > 'properties.ssl.truststore.password'='secret' > 'properties.ssl.truststore.type'='JKS' > 'properties.zookeeper.connect'='localhost:2181' > 'scan.startup.mode'='earliest-offset' > 'topic'='events' > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:140) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) > at com.coinbase.ml.FeatureStoreJob.runSqlQuery(FeatureStoreJob.scala:133) > at com.coinbase.ml.FeatureStoreJob.run(FeatureStoreJob.scala:36) > at com.coinbase.ml.RunFlinkJob$.runFlinkJob(RunFlinkJob.scala:30) > at > com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint$.main(CmdLineParser.scala:76) > at > com.coinbase.ml.FlinkFeatureProcessingJobEntryPoint.main(CmdLineParser.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 11 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover > a connector using option ''connector'='kafka''. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ... 43 more > > *Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath.* > Available factory identifiers are: > > *datagen* > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ... 44 more > > > >