[ https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17825981#comment-17825981 ]
Danny Cranmer commented on FLINK-34044: --------------------------------------- Thanks [~chalixar] . I am on vacation this week, will take a look next week > Kinesis Sink Cannot be Created via TableDescriptor > -------------------------------------------------- > > Key: FLINK-34044 > URL: https://issues.apache.org/jira/browse/FLINK-34044 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS > Affects Versions: aws-connector-4.2.0 > Reporter: Tilman Krokotsch > Priority: Major > Labels: pull-request-available > > When trying to create a Kinesis Stream Sink in Table API via a > TableDescriptor I get an error: > {code:java} > Caused by: java.lang.UnsupportedOperationException > at > java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460) > at > org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249) > at > org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158) > at > org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90) > at > org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61) > at > org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267) > ... 20 more > {code} > Here is a minimum reproducing example with Flink-1.17.2 and > flink-connector-kinesis-4.2.0: > {code:java} > public class Job { > public static void main(String[] args) throws Exception { > // create data stream environment > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); > Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); > tEnv.createTemporaryTable( > "exampleTable", > TableDescriptor.forConnector("datagen").schema(a).build()); > TableDescriptor descriptor = > TableDescriptor.forConnector("kinesis") > .schema(a) > .format("json") > .option("stream", "abc") > .option("aws.region", "eu-central-1") > .build(); > tEnv.createTemporaryTable("sinkTable", descriptor); > tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here > } > } {code} > From my investigation, the error is triggered by the `ResolvedCatalogTable` > used when re-mapping the deprecated Kinesis options in > `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns > an `UnmodifiableMap` which is not mutable. > If the sink table is created via SQL, the error does not occur: > {code:java} > tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString()); > {code} > because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`. -- This message was sent by Atlassian Jira (v8.20.10#820010)