This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 451a26e0 [ISSUE #364] Fix worker direct task can not pause (#365)
451a26e0 is described below

commit 451a26e03294be46d81ddad7250402c272ef98bf
Author: Slideee <[email protected]>
AuthorDate: Thu Nov 3 14:06:59 2022 +0800

    [ISSUE #364] Fix worker direct task can not pause (#365)
---
 .../connect/runtime/connectorwrapper/WorkerDirectTask.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 7bba2f97..59641fd2 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -171,6 +171,20 @@ public class WorkerDirectTask extends WorkerSourceTask {
     protected void execute() {
         while (isRunning()) {
             updateCommittableOffsets();
+
+            if (shouldPause()) {
+                onPause();
+                try {
+                    // wait unpause
+                    if (awaitUnpause()) {
+                        onResume();
+                    }
+                    continue;
+                } catch (InterruptedException e) {
+                    // do exception
+                }
+            }
+
             try {
                 Collection<ConnectRecord> toSendEntries = sourceTask.poll();
                 if (!toSendEntries.isEmpty()) {

Reply via email to