sollhui commented on code in PR #38474:
URL: https://github.com/apache/doris/pull/38474#discussion_r1703509867
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java:
##########
@@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map<String, String>
jobProperties,
customKafkaProperties =
dataSourceProperties.getCustomKafkaProperties();
}
- // modify partition offset first
- if (!kafkaPartitionOffsets.isEmpty()) {
- // we can only modify the partition that is being consumed
- ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
- }
-
+ // convertCustomProperties and check partitions before reset
progress to make modify operation atomic
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
- // modify broker list and topic
- if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
- this.brokerList = dataSourceProperties.getBrokerList();
+
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ ((KafkaProgress)
progress).checkPartitions(kafkaPartitionOffsets);
}
+
+ // It is necessary to reset the Kafka progress cache if topic
change,
+ // and should reset cache before modifying partition offset.
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
+ if (Config.isCloudMode()) {
+ resetCloudProgress();
+ }
this.topic = dataSourceProperties.getTopic();
+ this.progress = new KafkaProgress();
+ }
+
+ // modify partition offset
+ if (!kafkaPartitionOffsets.isEmpty()) {
+ // we can only modify the partition that is being consumed
+ ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
Review Comment:
We need, and will fix it in next pr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]