[ 
https://issues.apache.org/jira/browse/FLINK-38862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38862:
-----------------------------------
    Labels: connector kafka-source pull-request-available table-api  (was: 
connector kafka-source table-api)

> Partition discovery interval not configurable in upsert mode (tableAPI)
> -----------------------------------------------------------------------
>
>                 Key: FLINK-38862
>                 URL: https://issues.apache.org/jira/browse/FLINK-38862
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: kafka-4.0.1
>         Environment:  
>  
>            Reporter: Efrat Levitan
>            Assignee: Efrat Levitan
>            Priority: Minor
>              Labels: connector, kafka-source, pull-request-available, 
> table-api
>
> While it is possible to pass `scan.topic-partition-discovery.interval` to 
> table API connector in append mode, (connector=kafka), upsert mode will error 
> out:
> ```
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported 
> options found for 'upsert-kafka'.
> Unsupported options:
> scan.topic-partition-discovery.interval
> Supported options:
> connector
> key.fields-prefix
> key.format
> key.test-format.changelog-mode
> key.test-format.delimiter
> key.test-format.deprecated-delimiter (deprecated)
> key.test-format.fail-on-missing
> key.test-format.fallback-fail-on-missing
> key.test-format.readable-metadata
> properties.bootstrap.servers
> property-version
> scan.bounded.mode
> scan.bounded.specific-offsets
> scan.bounded.timestamp-millis
> scan.parallelism
> scan.watermark.alignment.group
> scan.watermark.alignment.max-drift
> scan.watermark.alignment.update-interval
> scan.watermark.emit.strategy
> scan.watermark.idle-timeout
> sink.buffer-flush.interval
> sink.buffer-flush.max-rows
> sink.delivery-guarantee
> sink.parallelism
> sink.transaction-naming-strategy
> sink.transactional-id-prefix
> topic
> topic-pattern
> value.fields-include
> value.format
> value.test-format.changelog-mode
> value.test-format.delimiter
> value.test-format.deprecated-delimiter (deprecated)
> value.test-format.fail-on-missing
> value.test-format.fallback-fail-on-missing
> value.test-format.readable-metadata
> ```
> However it doesn't mean partition discovery isn't enabled for upsert mode, 
> only that it [falls 
> back|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L166-L170]
>  to the default config ([currently 
> 5m|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java#L42]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to