[ https://issues.apache.org/jira/browse/IGNITE-19910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ilya Shishkov updated IGNITE-19910: ----------------------------------- Description: Currently, in CDC through Kafka applications, single timeout property ({{kafkaRequestTimeout)}} is used for all Kafka related operations instead of built-in timeouts of Kafka clients API (moreover, default value of 3 seconds does not correspond to Kafka clients defaults): ||Client||Timeout||Default value, s|| |{{KafkaProducer}}|{{delivery.timeout.ms}}|120| |{{KafkaProducer}}|{{request.timeout.ms}}|30| |{{KafkaConsumer}}|{{default.api.timeout.ms}}|60| |{{KafkaConsumer}}|{{request.timeout.ms}}|30| Table below describes places where {{kafkaRequestTimeout}} is _explicitly specified_ as total operation timeout instead of using default timeouts: ||CDC application||API||Default value || |ignite-cdc.sh: {{IgniteToKafkaCdcStreamer}}|{{KafkaProducer#send}}|{{delivery.timeout.ms}} *| |kafka-to-ignite.sh: {{KafkaToIgniteCdcStreamerApplier}}|{{KafkaConsumer#commitSync}}|{{default.api.timeout.ms}}| |kafka-to-ignite.sh: {{KafkaToIgniteCdcStreamerApplier}}|{{KafkaConsumer#close}}|{{KafkaConsumer#DEFAULT_CLOSE_TIMEOUT_MS}} (30s)| |kafka-to-ignite.sh: {{KafkaToIgniteMetadataUpdater}}|{{KafkaConsumer#partitionsFor}}|{{default.api.timeout.ms}}| |kafka-to-ignite.sh: {{KafkaToIgniteMetadataUpdater}}|{{KafkaConsumer#endOffsets}}|{{request.timeout.ms}}| \* - waits for future during specified timeout ({{kafkaRequestTimeout}}), but future fails itself if delivery timeout exceeded. *Selection of required timeout* All above methods will fail, when specified timeout will be exceeded. They will raise an exception, thus, specified timeout *_should not be to low_*. On the other hand, kafka-to-ignite.sh also invokes {{KafkaConsumer#poll}} with timeout {{kafkaRequestTimeout}}, but it just waits for data until specified timeout expires. So, {{#poll}} should be called quite often and we *_should not set too large timeout_* for it, otherwise, we can face with delays of replication, when some topic partitions have no new data. It is not desired behavior, because in this case some partitions will wait to be processed. *Kafka clients request retries* Each single request will be retried in case of {{request.timeout.ms}} exceeding [2, 4]. Minimal amount of retries equals to ratio of total operation timeout to {{request.timeout.ms}}. Total timeout is an explicitly specified argument of API method or default value (described in above tables). It is obvious, that currently {{kafkaRequestTimeout}} have to be N times greater, than {{request.timeout.ms}} in order to make request retries possible, i.e. we have to override default value of 3s in CDC configuration. *Conclusion* # It seems, that the better approach is to rely on kafka clients timeouts, because they provide all functions necessary to perform retries and handle timeout issues. # {{kafkaRequestTimeout}} should be used only for {{KafkaConsumer#poll}}, default value of 3s can remain the same. ---- Links: # https://kafka.apache.org/27/documentation.html#producerconfigs_delivery.timeout.ms # https://kafka.apache.org/27/documentation.html#producerconfigs_request.timeout.ms # https://kafka.apache.org/27/documentation.html#consumerconfigs_default.api.timeout.ms # https://kafka.apache.org/27/documentation.html#consumerconfigs_request.timeout.ms was: Currently, in CDC through Kafka applications, single timeout property ({{kafkaRequestTimeout)}} is used for all Kafka related operations instead of built-in timeouts of Kafka clients API (moreover, default value of 3 seconds does not correspond to Kafka clients defaults): ||Client||Timeout||Default value, s|| |{{KafkaProducer}}|{{delivery.timeout.ms}}|120| |{{KafkaProducer}}|{{request.timeout.ms}}|30| |{{KafkaConsumer}}|{{default.api.timeout.ms}}|60| |{{KafkaConsumer}}|{{request.timeout.ms}}|30| Table below describes places where {{kafkaRequestTimeout}} is _explicitly specified_ as total operation timeout instead of using default timeouts: ||CDC application||API||Default value || |ignite-cdc.sh: {{IgniteToKafkaCdcStreamer}}|{{KafkaProducer#send}}|{{delivery.timeout.ms}} *| |kafka-to-ignite.sh: {{KafkaToIgniteCdcStreamerApplier}}|{{KafkaConsumer#commitSync}}|{{default.api.timeout.ms}}| |kafka-to-ignite.sh: {{KafkaToIgniteCdcStreamerApplier}}|{{KafkaConsumer#close}}|{{KafkaConsumer#DEFAULT_CLOSE_TIMEOUT_MS}} (30s)| |kafka-to-ignite.sh: {{KafkaToIgniteMetadataUpdater}}|{{KafkaConsumer#partitionsFor}}|{{default.api.timeout.ms}}| |kafka-to-ignite.sh: {{KafkaToIgniteMetadataUpdater}}|{{KafkaConsumer#endOffsets}}|{{request.timeout.ms}}| \* - waits for future during specified timeout ({{kafkaRequestTimeout}}), but future fails itself if delivery timeout exceeded. *Selection of required timeout* All above methods will fail, when specified timeout will be exceeded. They will raise an exception, thus, timeout *_should not be to low for them_*. On the other hand, kafka-to-ignite.sh also invokes {{KafkaConsumer#poll}} with timeout {{kafkaRequestTimeout}}, but it just waits for data until specified timeout expires. So, {{#poll}} should be called quite often and we *_should not set too large timeout_* for it, otherwise, we can face with delays of replication, when some topic partitions have no new data. It is not desired behavior, because in this case some partitions will wait to be processed. *Kafka clients request retries* Each single request will be retried in case of {{request.timeout.ms}} exceeding [2, 4]. Minimal amount of retries equals to ratio of total operation timeout to {{request.timeout.ms}}. Total timeout is an explicitly specified argument of API method or default value (described in above tables). It is obvious, that currently {{kafkaRequestTimeout}} have to be N times greater, than {{request.timeout.ms}} in order to make request retries possible, i.e. we have to override default value of 3s in CDC configuration. *Conclusion* # It seems, that the better approach is to rely on kafka clients timeouts, because they provide all functions necessary to perform retries and handle timeout issues. # {{kafkaRequestTimeout}} should be used only for {{KafkaConsumer#poll}}, default value of 3s can remain the same. ---- Links: # https://kafka.apache.org/27/documentation.html#producerconfigs_delivery.timeout.ms # https://kafka.apache.org/27/documentation.html#producerconfigs_request.timeout.ms # https://kafka.apache.org/27/documentation.html#consumerconfigs_default.api.timeout.ms # https://kafka.apache.org/27/documentation.html#consumerconfigs_request.timeout.ms > CDC through Kafka: refactor timeouts > ------------------------------------ > > Key: IGNITE-19910 > URL: https://issues.apache.org/jira/browse/IGNITE-19910 > Project: Ignite > Issue Type: Task > Components: extensions > Reporter: Ilya Shishkov > Priority: Minor > Labels: IEP-59, ise > > Currently, in CDC through Kafka applications, single timeout property > ({{kafkaRequestTimeout)}} is used for all Kafka related operations instead of > built-in timeouts of Kafka clients API (moreover, default value of 3 seconds > does not correspond to Kafka clients defaults): > ||Client||Timeout||Default value, s|| > |{{KafkaProducer}}|{{delivery.timeout.ms}}|120| > |{{KafkaProducer}}|{{request.timeout.ms}}|30| > |{{KafkaConsumer}}|{{default.api.timeout.ms}}|60| > |{{KafkaConsumer}}|{{request.timeout.ms}}|30| > Table below describes places where {{kafkaRequestTimeout}} is _explicitly > specified_ as total operation timeout instead of using default timeouts: > ||CDC application||API||Default value || > |ignite-cdc.sh: > {{IgniteToKafkaCdcStreamer}}|{{KafkaProducer#send}}|{{delivery.timeout.ms}} *| > |kafka-to-ignite.sh: > {{KafkaToIgniteCdcStreamerApplier}}|{{KafkaConsumer#commitSync}}|{{default.api.timeout.ms}}| > |kafka-to-ignite.sh: > {{KafkaToIgniteCdcStreamerApplier}}|{{KafkaConsumer#close}}|{{KafkaConsumer#DEFAULT_CLOSE_TIMEOUT_MS}} > (30s)| > |kafka-to-ignite.sh: > {{KafkaToIgniteMetadataUpdater}}|{{KafkaConsumer#partitionsFor}}|{{default.api.timeout.ms}}| > |kafka-to-ignite.sh: > {{KafkaToIgniteMetadataUpdater}}|{{KafkaConsumer#endOffsets}}|{{request.timeout.ms}}| > \* - waits for future during specified timeout ({{kafkaRequestTimeout}}), but > future fails itself if delivery timeout exceeded. > *Selection of required timeout* > All above methods will fail, when specified timeout will be exceeded. They > will raise an exception, thus, specified timeout *_should not be to low_*. > On the other hand, kafka-to-ignite.sh also invokes {{KafkaConsumer#poll}} > with timeout {{kafkaRequestTimeout}}, but it just waits for data until > specified timeout expires. So, {{#poll}} should be called quite often and we > *_should not set too large timeout_* for it, otherwise, we can face with > delays of replication, when some topic partitions have no new data. It is not > desired behavior, because in this case some partitions will wait to be > processed. > *Kafka clients request retries* > Each single request will be retried in case of {{request.timeout.ms}} > exceeding [2, 4]. Minimal amount of retries equals to ratio of total > operation timeout to {{request.timeout.ms}}. Total timeout is an explicitly > specified argument of API method or default value (described in above > tables). > It is obvious, that currently {{kafkaRequestTimeout}} have to be N times > greater, than {{request.timeout.ms}} in order to make request retries > possible, i.e. we have to override default value of 3s in CDC configuration. > *Conclusion* > # It seems, that the better approach is to rely on kafka clients timeouts, > because they provide all functions necessary to perform retries and handle > timeout issues. > # {{kafkaRequestTimeout}} should be used only for {{KafkaConsumer#poll}}, > default value of 3s can remain the same. > ---- > Links: > # > https://kafka.apache.org/27/documentation.html#producerconfigs_delivery.timeout.ms > # > https://kafka.apache.org/27/documentation.html#producerconfigs_request.timeout.ms > # > https://kafka.apache.org/27/documentation.html#consumerconfigs_default.api.timeout.ms > # > https://kafka.apache.org/27/documentation.html#consumerconfigs_request.timeout.ms -- This message was sent by Atlassian Jira (v8.20.10#820010)