[ https://issues.apache.org/jira/browse/FLINK-34076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17806797#comment-17806797 ]
Danny Cranmer commented on FLINK-34076: --------------------------------------- [~jiabao.sun] Ah, I missed that. Interesting. > flink-connector-base missing fails kinesis table sink to create > --------------------------------------------------------------- > > Key: FLINK-34076 > URL: https://issues.apache.org/jira/browse/FLINK-34076 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: aws-connector-4.2.0 > Reporter: Khanh Vu > Priority: Major > Attachments: screenshot-1.png, screenshot-2.png, screenshot-3.png, > screenshot-4.png > > > The following issue encounters with flink-kinesis-connector v4.2.0, Flink > 1.17, it's working properly with kinesis connector v4.1.0 (I have not tested > version pre v4.1.0). > The > [commit|https://github.com/apache/flink-connector-aws/commit/01f112bd5a69f95cd5d2a4bc7e08d1ba9a81d56a] > which stops bundling `flink-connector-base` with `flink-connector-kinesis` > has caused kinesis sink failing to create when using Table API as required > classes from `flink-connector-base` are not loaded in runtime. > E.g. with following depenency only in pom.xml > {code:java} > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kinesis</artifactId> > <version>${flink.connector.kinesis.version}</version> > </dependency> > {code} > and a minimal job definition: > {code:java} > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", > DataTypes.STRING()).build(); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.executeSql("CREATE TABLE sinkTable " + > descriptor.toString()).print(); > } > {code} > following exception will be thrown: > {code:java} > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory > at > jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > ~[?:?] > at > jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > ~[?:?] > at java.lang.ClassLoader.loadClass(ClassLoader.java:527) ~[?:?] > ... 28 more > {code} > The fix is to explicitly specify `flink-connector-base` as dependency of the > project: > {code:java} > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kinesis</artifactId> > <version>${flink.connector.kinesis.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-base</artifactId> > <version>${flink.version}</version> > <scope>provided</scope> > </dependency> > {code} > In general, `flink-connector-base` should be pulled in by default when > pulling in the kinesis connector, the current separation adds unnecessary > hassle to use the connector. -- This message was sent by Atlassian Jira (v8.20.10#820010)