[ https://issues.apache.org/jira/browse/FLINK-33181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Khanh Vu updated FLINK-33181: ----------------------------- Description: First, I define a table which uses `kinesis` connector with an unsupported property for sink, e.g. `scan.stream.initpos`: ``` %flink.ssql(type=update) – Create input DROP TABLE IF EXISTS `kds_input`; CREATE TABLE `kds_input` ( `some_string` STRING, `some_int` BIGINT, `time` AS PROCTIME() ) WITH ( 'connector' = 'kinesis', 'stream' = 'ExampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' ); ``` I can read from my table (kds_input) without any issue, but it will throw exception if I try to write to the table: ``` %flink.ssql(type=update) – Use to generate data in the input table DROP TABLE IF EXISTS connector_cve_datagen; CREATE TABLE connector_cve_datagen( `some_string` STRING, `some_int` BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.some_string.length' = '2'); INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen ``` Exception observed: ``` Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'. Unsupported options: scan.stream.initpos Supported options: aws.region connector csv.allow-comments csv.array-element-delimiter csv.disable-quote-character csv.escape-character csv.field-delimiter csv.ignore-parse-errors csv.null-literal csv.quote-character format property-version sink.batch.max-size sink.fail-on-error sink.flush-buffer.size sink.flush-buffer.timeout sink.partitioner sink.partitioner-field-delimiter sink.producer.collection-max-count (deprecated) sink.producer.collection-max-size (deprecated) sink.producer.fail-on-error (deprecated) sink.producer.record-max-buffered-time (deprecated) sink.requests.max-buffered sink.requests.max-inflight stream at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) ... 36 more ``` was: First, I define a table which uses `kinesis` connector with an unsupported property for sink, e.g. `scan.stream.initpos`: ``` %flink.ssql(type=update) -- Create input DROP TABLE IF EXISTS `kds_input`; CREATE TABLE `kds_input` ( `some_string` STRING, `some_int` BIGINT, `time` AS PROCTIME() ) WITH ( 'connector' = 'kinesis', 'stream' = 'ExampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' ); ``` I can read from my table (kds_input) without any issue, but it will throw exception if I try to write to the table: ``` %flink.ssql(type=update) -- Use to generate data in the input table DROP TABLE IF EXISTS connector_cve_datagen; CREATE TABLE connector_cve_datagen( `some_string` STRING, `some_int` BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.some_string.length' = '2'); INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen ``` Exception observed: ``` Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'. Unsupported options: scan.stream.initpos Supported options: aws.region connector csv.allow-comments csv.array-element-delimiter csv.disable-quote-character csv.escape-character csv.field-delimiter csv.ignore-parse-errors csv.null-literal csv.quote-character format property-version sink.batch.max-size sink.fail-on-error sink.flush-buffer.size sink.flush-buffer.timeout sink.partitioner sink.partitioner-field-delimiter sink.producer.collection-max-count (deprecated) sink.producer.collection-max-size (deprecated) sink.producer.fail-on-error (deprecated) sink.producer.record-max-buffered-time (deprecated) sink.requests.max-buffered sink.requests.max-inflight stream at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) ... 36 more ``` > Table using `kinesis` connector can not be used for both read & write > operations if it's defined with unsupported sink property > ------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-33181 > URL: https://issues.apache.org/jira/browse/FLINK-33181 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis, Table SQL / Runtime > Affects Versions: 1.15.4 > Reporter: Khanh Vu > Priority: Major > > First, I define a table which uses `kinesis` connector with an unsupported > property for sink, e.g. `scan.stream.initpos`: > > ``` > %flink.ssql(type=update) > – Create input > DROP TABLE IF EXISTS `kds_input`; > CREATE TABLE `kds_input` ( > `some_string` STRING, > `some_int` BIGINT, > `time` AS PROCTIME() > ) WITH ( > 'connector' = 'kinesis', > 'stream' = 'ExampleInputStream', > 'aws.region' = 'us-east-1', > 'scan.stream.initpos' = 'LATEST', > 'format' = 'csv' > ); > ``` > I can read from my table (kds_input) without any issue, but it will throw > exception if I try to write to the table: > ``` > %flink.ssql(type=update) > – Use to generate data in the input table > DROP TABLE IF EXISTS connector_cve_datagen; > CREATE TABLE connector_cve_datagen( > `some_string` STRING, > `some_int` BIGINT > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1', > 'fields.some_string.length' = '2'); > INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen > ``` > Exception observed: > ``` > Caused by: org.apache.flink.table.api.ValidationException: Unsupported > options found for 'kinesis'. > Unsupported options: > scan.stream.initpos > Supported options: > aws.region > connector > csv.allow-comments > csv.array-element-delimiter > csv.disable-quote-character > csv.escape-character > csv.field-delimiter > csv.ignore-parse-errors > csv.null-literal > csv.quote-character > format > property-version > sink.batch.max-size > sink.fail-on-error > sink.flush-buffer.size > sink.flush-buffer.timeout > sink.partitioner > sink.partitioner-field-delimiter > sink.producer.collection-max-count (deprecated) > sink.producer.collection-max-size (deprecated) > sink.producer.fail-on-error (deprecated) > sink.producer.record-max-buffered-time (deprecated) > sink.requests.max-buffered > sink.requests.max-inflight > stream > at > org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) > at > org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) > at > org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) > at > org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) > at > org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) > at > org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) > ... 36 more > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)