This is an automated email from the ASF dual-hosted git repository. sunxiaojian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push: new 98748109 [ISSUE #314] Worker sink task bug fix (#315) 98748109 is described below commit 987481090d0daacdb0021b1c32ebc1a4d83988e4 Author: zhoubo <877036...@qq.com> AuthorDate: Tue Sep 13 10:46:04 2022 +0800 [ISSUE #314] Worker sink task bug fix (#315) * consume.from.where formate checkAndStopConnectors bug fix remove sinkTask.flush removeAndCloseMessageQueue topic bug fix removeMessageQueues filter the same topic sinkTaskContext.getPausedQueues().retainAll messageQueues * remove duplicate connectors.remove & optimize consumer setMessageQueueListener --- .../connect/runtime/config/ConnectorConfig.java | 3 ++ .../connect/runtime/connectorwrapper/Worker.java | 10 +++- .../runtime/connectorwrapper/WorkerSinkTask.java | 59 ++++++++++++---------- .../runtime/connectorwrapper/WorkerTask.java | 7 ++- 4 files changed, 45 insertions(+), 34 deletions(-) diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java index cf0f17af..58310fbf 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java @@ -95,6 +95,9 @@ public class ConnectorConfig { public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance"; public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE; + public static final String CONSUME_FROM_WHERE = "consume.from.where"; + + /** * The required key for all configurations. */ diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index bcd889b4..4ba7baff 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -244,12 +244,18 @@ public class Worker { * @param assigns */ private void checkAndStopConnectors(Collection<String> assigns) { - if (assigns.isEmpty()) { + if (CollectionUtils.isEmpty(assigns)) { // delete all - assigns = connectors.keySet(); + Set<String> connectors = this.connectors.keySet(); + for (String connector : connectors) { + log.info("It may be that the load balancing assigns this connector to other nodes,connector {}", connector); + stopAndAwaitConnector(connector); + } + return; } for (String connectorName : assigns) { if (!assigns.contains(connectorName)) { + log.info("It may be that the load balancing assigns this connector to other nodes,connector {}", connectorName); stopAndAwaitConnector(connectorName); } } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java index 14dc9cbc..c4fbc095 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -304,7 +304,6 @@ public class WorkerSinkTask extends WorkerTask { } finally { if (closing) { log.trace("{} Closing the task before committing the offsets: {}", this, offsetsToCommit); - sinkTask.flush(taskProvidedRecordOffsets); } } if (taskProvidedOffsets.isEmpty()) { @@ -574,34 +573,37 @@ public class WorkerSinkTask extends WorkerTask { for (String topic : topics) { consumer.setPullBatchSize(MAX_MESSAGE_NUM); consumer.subscribe(topic, "*"); - if (messageQueueListener == null) { - messageQueueListener = consumer.getMessageQueueListener(); - } - consumer.setMessageQueueListener(new MessageQueueListener() { - @Override - public void messageQueueChanged(String subTopic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { - // update assign message queue - messageQueueListener.messageQueueChanged(subTopic, mqAll, mqDivided); - // listener message queue changed - log.info("Message queue changed start, old message queues offset {}", JSON.toJSONString(messageQueues)); - - if (isStopping()) { - log.trace("Skipping partition revocation callback as task has already been stopped"); - return; - } - // remove and close message queue - removeAndCloseMessageQueue(topic, mqDivided); + } + if (messageQueueListener == null) { + messageQueueListener = consumer.getMessageQueueListener(); + } + consumer.setMessageQueueListener(new MessageQueueListener() { + @Override + public void messageQueueChanged(String subTopic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + // update assign message queue + messageQueueListener.messageQueueChanged(subTopic, mqAll, mqDivided); + // listener message queue changed + log.info("Message queue changed start, old message queues offset {}", JSON.toJSONString(messageQueues)); + + if (isStopping()) { + log.trace("Skipping partition revocation callback as task has already been stopped"); + return; + } + // remove and close message queue + log.info("Task {},MessageQueueChanged, old messageQueuesOffsetMap {}", id.toString(), JSON.toJSONString(messageQueues)); + removeAndCloseMessageQueue(subTopic, mqDivided); - // add new message queue - assignMessageQueue(mqDivided); - preCommit(); - log.info("Message queue changed start, new message queues offset {}", JSON.toJSONString(messageQueues)); + // add new message queue + assignMessageQueue(mqDivided); + log.info("Task {}, Message queue changed end, new message queues offset {}", id, JSON.toJSONString(messageQueues)); + preCommit(); + log.info("Message queue changed start, new message queues offset {}", JSON.toJSONString(messageQueues)); - } - }); - } + } + }); consumer.start(); } catch (MQClientException e) { + log.error("Task {},InitializeAndStart MQClientException", id.toString(), e); throw new ConnectException(e); } log.info("Sink task consumer start. taskConfig {}", JSON.toJSONString(taskConfig)); @@ -626,7 +628,7 @@ public class WorkerSinkTask extends WorkerTask { } } // filter not contains in messageQueues - removeMessageQueues = messageQueues.stream().filter(messageQueue -> !queues.contains(messageQueue)).collect(Collectors.toSet()); + removeMessageQueues = messageQueues.stream().filter(messageQueue -> topic.equals(messageQueue.getTopic()) && !queues.contains(messageQueue)).collect(Collectors.toSet()); if (removeMessageQueues == null || removeMessageQueues.isEmpty()) { return; } @@ -696,8 +698,9 @@ public class WorkerSinkTask extends WorkerTask { resumeAll(); } // reset - sinkTaskContext.getPausedQueues().retainAll(queues); + sinkTaskContext.getPausedQueues().retainAll(messageQueues); if (shouldPause()) { + pauseAll(); return; } if (!sinkTaskContext.getPausedQueues().isEmpty()) { @@ -725,7 +728,7 @@ public class WorkerSinkTask extends WorkerTask { } if (offset < 0) { - String consumeFromWhere = taskConfig.getString("consume-from-where"); + String consumeFromWhere = taskConfig.getString(ConnectorConfig.CONSUME_FROM_WHERE); if (StringUtils.isBlank(consumeFromWhere)) { consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name(); } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java index 9dace233..90e7fba5 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java @@ -17,6 +17,9 @@ package org.apache.rocketmq.connect.runtime.connectorwrapper; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; import org.apache.rocketmq.connect.runtime.config.WorkerConfig; import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus; @@ -27,10 +30,6 @@ import org.apache.rocketmq.connect.runtime.utils.CurrentTaskState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - /** * Should we use callable here ? */