Hi, team,

We are using Kafka Connect at present and have encountered a problem with 
version 2.1.0 that all connectors kept restarting when one new connector was 
added into the cluster which then failed to start due to some network problem 
(firewall not open). 
And the Connect daemon failed to serve requests since the herder thread was 
blocked, and we could not delete the failed connector until we restarted the 
Connect process.

We searched for solutions in new versions and found that KIP-415 may be what we 
want since it seems to be able to avoid the stop-the-world behavior when any 
connector change happens.

After test on branch:2.3.0 which includes KIP-415, we found an unexpected 
behavior.

Here are the steps for reproducing the case:

1. setup a Kafka Connect cluster by starting one worker with distributed config 
connecting to an existing Kafka
2. create two topics (say T1, T2) to be the destination for two source 
connectors
3. create a FileStreamSourceConnector (say C1) to write file content to T1 and 
from status API we can see the tasks of C1 are running
4. create another FileStreamSourceConnector (say C2) to write file content to T2

After finishing these steps, what we expected is that all the tasks of C1 will 
not be restarted since they are running independently and they don’t care about 
the new added C2 and its tasks.
But the actual behavior is that all the tasks of C1 will be restarted according 
to the INFO and DEBUG logs.


The log is too long and is shown partly below (C1 named file_sync_1 and C2 
named file_sync_2):
———————————————————————————————————————————————————————————————————————————————————
 —————— 
[2019-06-27 22:55:04,621] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Connector file_sync_2 config updated 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1268)

…

[2019-06-27 22:55:05,134] DEBUG [Worker clientId=connect-1, 
groupId=connect-cluster] Augmented new assignment: Assignment{error=0, 
leader='connect-1-903c66b2-ba68-4e4f-b27f-7cf58ce41f77', 
leaderUrl='http://127.0.0.1:8083/', offset=10, connectorIds=[file_sync_2, 
file_sync_1], taskIds=[file_sync_1-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:198)

…

[2019-06-27 22:55:05,134] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Starting connector file_sync_2 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1075)

…

[2019-06-27 22:55:05,630] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Tasks [file_sync_2-0, file_sync_1-0] configs updated 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1283)

...

[2019-06-27 22:55:06,133] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Handling task config update by restarting tasks 
[file_sync_1-0] 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:498)
[2019-06-27 22:55:06,133] INFO Stopping task file_sync_1-0 
(org.apache.kafka.connect.runtime.Worker:684)

…
———————————————————————————————————————————————————————————————————————————————————
 —————— 



From the log we can see the creation of C2 led to the restart of tasks of C1 
(file_sync_1-0), which means the creation of one connector will have impact on 
all tasks in this one-worker cluster.

We went through the code and found the restart behavior is triggered by the 
following logic:

——————————————————————————————— ———————————————————————————
// 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 Line: 488 ~ 501

private void 
processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId> 
taskConfigUpdates) {
    Set<ConnectorTaskId> localTasks = assignment == null
                                      ? Collections.emptySet()
                                      : new HashSet<>(assignment.tasks());
    Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
            .map(ConnectorTaskId::connector).collect(Collectors.toSet());

    List<ConnectorTaskId> tasksToStop = localTasks.stream()
            .filter(taskId -> 
connectorsWhoseTasksToStop.contains(taskId.connector()))
            .collect(Collectors.toList());
    log.info("Handling task config update by restarting tasks {}", tasksToStop);
    worker.stopAndAwaitTasks(tasksToStop);
    tasksToRestart.addAll(tasksToStop);
}

// The taskConfigUpdates originally comes from 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 Line: 620 ~ 637

    } else {
        if (deferred != null) {
            taskConfigs.putAll(deferred);
            updatedTasks.addAll(taskConfigs.keySet());
        }
        inconsistent.remove(connectorName);
    }
    // Always clear the deferred entries, even if we didn't apply them. If they 
represented an inconsistent
    // update, then we need to see a completely fresh set of configs after this 
commit message, so we don't
    // want any of these outdated configs
    if (deferred != null)
        deferred.clear();

    connectorTaskCounts.put(connectorName, newTaskCount);
}

if (started)
    updateListener.onTaskConfigUpdate(updatedTasks);
——————————————————————————————————————————————————————————— 

As we can see the updatedTasks contain the running tasks of previous assignment.

So the question here is:

Is this an expected behavior by designed? If yes, is it also the same behavior 
when other connector level changes happen such as config update of a connector 
or a connector restart?

Please correct me if I got anything wrong. Any feedback would be greatly 
appreciated. Thank you!

Regards,
Shurong


 

Reply via email to