[ https://issues.apache.org/jira/browse/FLINK-33181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Cranmer updated FLINK-33181: ---------------------------------- Affects Version/s: aws-connector-4.1.0 > Table using `kinesis` connector can not be used for both read & write > operations if it's defined with unsupported sink property > ------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-33181 > URL: https://issues.apache.org/jira/browse/FLINK-33181 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Table SQL / Runtime > Affects Versions: 1.15.4, aws-connector-4.1.0 > Reporter: Khanh Vu > Assignee: Khanh Vu > Priority: Major > Fix For: aws-connector-4.2.0 > > > First, I define a table which uses `kinesis` connector with an unsupported > property for sink, e.g. `scan.stream.initpos`: > {code:sql} > %flink.ssql(type=update) > – Create input > DROP TABLE IF EXISTS `kds_input`; > CREATE TABLE `kds_input` ( > `some_string` STRING, > `some_int` BIGINT, > `time` AS PROCTIME() > ) WITH ( > 'connector' = 'kinesis', > 'stream' = 'ExampleInputStream', > 'aws.region' = 'us-east-1', > 'scan.stream.initpos' = 'LATEST', > 'format' = 'csv' > ); > {code} > I can read from my table (kds_input) without any issue, but it will throw > exception if I try to write to the table: > {code:sql} > %flink.ssql(type=update) > – Use to generate data in the input table > DROP TABLE IF EXISTS connector_cve_datagen; > CREATE TABLE connector_cve_datagen( > `some_string` STRING, > `some_int` BIGINT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1', > 'fields.some_string.length' = '2'); > INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen > {code} > Exception observed: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Unsupported > options found for 'kinesis'. > Unsupported options: > scan.stream.initpos > Supported options: > aws.region > connector > csv.allow-comments > csv.array-element-delimiter > csv.disable-quote-character > csv.escape-character > csv.field-delimiter > csv.ignore-parse-errors > csv.null-literal > csv.quote-character > format > property-version > sink.batch.max-size > sink.fail-on-error > sink.flush-buffer.size > sink.flush-buffer.timeout > sink.partitioner > sink.partitioner-field-delimiter > sink.producer.collection-max-count (deprecated) > sink.producer.collection-max-size (deprecated) > sink.producer.fail-on-error (deprecated) > sink.producer.record-max-buffered-time (deprecated) > sink.requests.max-buffered > sink.requests.max-inflight > stream > at > org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) > at > org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) > at > org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) > at > org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) > at > org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) > ... 36 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)