[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309852#comment-16309852
 ] 

Randall Hauch commented on KAFKA-6252:
--------------------------------------

As for how we might more gracefully deal with such connectors, I can think of 
two possible changes:

# always ensure that {{poll()}} always returns by interrupting the task; or
# change how the task-specific metrics are created to deregister any existing 
metric group that already exists.

Ideally, we'd do the first to make sure every task is always cleaned up. But 
there are quite a few issues with this. First, even tho {{poll()}} already 
throws {{InterruptedException}}, interrupting an unresponsive {{poll()}} method 
would change existing behavior. Second, not all connector implementations may 
be expecting an interrupt and may fail to implement it properly. Third, we'd 
have to decide how long to wait before interrupting. None of these are 
straightforward.

So, my current thinking is that the second approach may be a better solution. 
Since each task-specific metrics group will only be duplicated if the connector 
is restarted and the previous tasks don't stop, the task-specific metrics group 
could clean up any previous task-specific metrics group instance. (Note that 
any metrics groups that are not specific to a connector and task should always 
be reused.)

> A metric named 'XX' already exists, can't register another one.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6252
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6252
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>         Environment: Linux
>            Reporter: Alexis Sellier
>            Priority: Critical
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>       at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>       at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>       at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.<init>(WorkerTask.java:328)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.<init>(WorkerTask.java:69)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.<init>(WorkerSinkTask.java:98)
>       at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>       at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to