Khanh Vu created FLINK-33181: -------------------------------- Summary: Table using `kinesis` connector can not be used for both read & write operations if it's defined with unsupported sink property Key: FLINK-33181 URL: https://issues.apache.org/jira/browse/FLINK-33181 Project: Flink Issue Type: Bug Components: Connectors / Kinesis, Table SQL / Runtime Affects Versions: 1.15.4 Reporter: Khanh Vu
First, I define a table which uses `kinesis` connector with an unsupported property for sink, e.g. `scan.stream.initpos`: ``` %flink.ssql(type=update) -- Create input DROP TABLE IF EXISTS `kds_input`; CREATE TABLE `kds_input` ( `some_string` STRING, `some_int` BIGINT, `time` AS PROCTIME() ) WITH ( 'connector' = 'kinesis', 'stream' = 'ExampleInputStream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' ); ``` I can read from my table (kds_input) without any issue, but it will throw exception if I try to write to the table: ``` %flink.ssql(type=update) -- Use to generate data in the input table DROP TABLE IF EXISTS connector_cve_datagen; CREATE TABLE connector_cve_datagen( `some_string` STRING, `some_int` BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.some_string.length' = '2'); INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen ``` Exception observed: ``` Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'. Unsupported options: scan.stream.initpos Supported options: aws.region connector csv.allow-comments csv.array-element-delimiter csv.disable-quote-character csv.escape-character csv.field-delimiter csv.ignore-parse-errors csv.null-literal csv.quote-character format property-version sink.batch.max-size sink.fail-on-error sink.flush-buffer.size sink.flush-buffer.timeout sink.partitioner sink.partitioner-field-delimiter sink.producer.collection-max-count (deprecated) sink.producer.collection-max-size (deprecated) sink.producer.fail-on-error (deprecated) sink.producer.record-max-buffered-time (deprecated) sink.requests.max-buffered sink.requests.max-inflight stream at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978) at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259) ... 36 more ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)