Khanh Vu created FLINK-33181:
--------------------------------

             Summary: Table using `kinesis` connector can not be used for both 
read & write operations if it's defined with unsupported sink property
                 Key: FLINK-33181
                 URL: https://issues.apache.org/jira/browse/FLINK-33181
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis, Table SQL / Runtime
    Affects Versions: 1.15.4
            Reporter: Khanh Vu


First, I define a table which uses `kinesis` connector with an unsupported 
property for sink, e.g. `scan.stream.initpos`:

```
%flink.ssql(type=update)

-- Create input

DROP TABLE IF EXISTS `kds_input`;
CREATE TABLE `kds_input` (
  `some_string` STRING,
  `some_int` BIGINT,
  `time` AS PROCTIME()
) WITH (
  'connector' = 'kinesis',
  'stream' = 'ExampleInputStream',
  'aws.region' = 'us-east-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

I can read from my table (kds_input) without any issue, but it will throw 
exception if I try to write to the table:
```
%flink.ssql(type=update)
--  Use to generate data in the input table

DROP TABLE IF EXISTS connector_cve_datagen;
CREATE TABLE connector_cve_datagen(
  `some_string` STRING,
  `some_int` BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.some_string.length' = '2');
INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen
```

Exception observed:
```
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options 
found for 'kinesis'.

Unsupported options:

scan.stream.initpos

Supported options:

aws.region
connector
csv.allow-comments
csv.array-element-delimiter
csv.disable-quote-character
csv.escape-character
csv.field-delimiter
csv.ignore-parse-errors
csv.null-literal
csv.quote-character
format
property-version
sink.batch.max-size
sink.fail-on-error
sink.flush-buffer.size
sink.flush-buffer.timeout
sink.partitioner
sink.partitioner-field-delimiter
sink.producer.collection-max-count (deprecated)
sink.producer.collection-max-size (deprecated)
sink.producer.fail-on-error (deprecated)
sink.producer.record-max-buffered-time (deprecated)
sink.requests.max-buffered
sink.requests.max-inflight
stream
        at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
        at 
org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
        ... 36 more
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to