This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 8a9db476bd [Fix][Connector-V2] assign size for KafkaSource reader cache queue (#9041) 8a9db476bd is described below commit 8a9db476bdc37d5e4079491bdb7092b892a17a7b Author: Jesse <35264598+jessea...@users.noreply.github.com> AuthorDate: Fri Mar 28 11:16:13 2025 +0800 [Fix][Connector-V2] assign size for KafkaSource reader cache queue (#9041) --- docs/en/connector-v2/source/Kafka.md | 2 ++ docs/zh/connector-v2/source/Kafka.md | 3 ++- .../connectors/seatunnel/kafka/config/KafkaSourceOptions.java | 6 ++++++ .../seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java | 3 +-- .../connectors/seatunnel/kafka/source/KafkaSourceConfig.java | 3 +++ .../connectors/seatunnel/kafka/source/KafkaSourceFactory.java | 3 ++- 6 files changed, 16 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md index 90406b0516..bcb145c7b5 100644 --- a/docs/en/connector-v2/source/Kafka.md +++ b/docs/en/connector-v2/source/Kafka.md @@ -56,6 +56,8 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details [...] | protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name [...] | protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition [...] +| reader_cache_queue_size | Integer | No | 1024 | The reader shard cache queue is used to cache the data corresponding to the shards. The size of the shard cache depends on the number of shards obtained by each reader, rather than the amount of data in each shard. [...] +| is_native | Boolean | No | false | Supports retaining the source information of the record. ### debezium_record_table_filter diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index a516f2b8a4..9005e78016 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -56,7 +56,8 @@ import ChangeLog from '../changelog/connector-kafka.md'; | common-options | | 否 | - | 源插件的常见参数,详情请参考 [Source Common Options](../source-common-options.md)。 | | protobuf_message_name | String | 否 | - | 当格式设置为 protobuf 时有效,指定消息名称。 | | protobuf_schema | String | 否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 | -| is_native | Boolean | No | false | 支持保留record的源信息。 | +| reader_cache_queue_size | Integer | 否 | 1024 | Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。 | +| is_native | Boolean | No | false | 支持保留record的源信息。 | ### debezium_record_table_filter diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java index 29b321f527..35b9573beb 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java @@ -41,6 +41,12 @@ public class KafkaSourceOptions extends KafkaBaseOptions { .withDescription( "Kafka consumer group id, used to distinguish different consumer groups."); + public static final Option<Integer> READER_CACHE_QUEUE_SIZE = + Options.key("reader_cache_queue_size") + .intType() + .defaultValue(1024) + .withDescription("The size of reader queue."); + public static final Option<Boolean> COMMIT_ON_CHECKPOINT = Options.key("commit_on_checkpoint") .booleanType() diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index d57bf03e64..c32e9ba2dd 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -78,9 +78,8 @@ public class KafkaSource @Override public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader( SourceReader.Context readerContext) { - BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue = - new LinkedBlockingQueue<>(); + new LinkedBlockingQueue<>(kafkaSourceConfig.getReaderCacheQueueSize()); Supplier<KafkaPartitionSplitReader> kafkaPartitionSplitReaderSupplier = () -> new KafkaPartitionSplitReader(kafkaSourceConfig, readerContext); diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 022e2d76bd..ef04d747dc 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -88,6 +88,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSource import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PATTERN; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_MESSAGE_NAME; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.READER_CACHE_QUEUE_SIZE; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP; @@ -105,6 +106,7 @@ public class KafkaSourceConfig implements Serializable { @Getter private final MessageFormatErrorHandleWay messageFormatErrorHandleWay; @Getter private final String consumerGroup; @Getter private final long pollTimeout; + @Getter private final int readerCacheQueueSize; public KafkaSourceConfig(ReadonlyConfig readonlyConfig) { this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS); @@ -116,6 +118,7 @@ public class KafkaSourceConfig implements Serializable { readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION); this.pollTimeout = readonlyConfig.get(KEY_POLL_TIMEOUT); this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP); + this.readerCacheQueueSize = readonlyConfig.get(READER_CACHE_QUEUE_SIZE); } private Properties createKafkaProperties(ReadonlyConfig readonlyConfig) { diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java index 4215f39a68..14bfbba415 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java @@ -57,7 +57,8 @@ public class KafkaSourceFactory implements TableSourceFactory { KafkaSourceOptions.FORMAT, KafkaSourceOptions.DEBEZIUM_RECORD_INCLUDE_SCHEMA, KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER, - KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS) + KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, + KafkaSourceOptions.READER_CACHE_QUEUE_SIZE) .conditional( KafkaSourceOptions.START_MODE, StartMode.TIMESTAMP,