[
https://issues.apache.org/jira/browse/FLINK-40051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093740#comment-18093740
]
Qiu Yanjun commented on FLINK-40051:
------------------------------------
Created PR: https://github.com/apache/flink-cdc/pull/4460
> erorr: No coordinator registered for operator when set operator.uid.prefix
> in mysql to hudi pipeline yaml
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-40051
> URL: https://issues.apache.org/jira/browse/FLINK-40051
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: 1.20.5
> Environment: hadoop: 3.0.0-cdh6.2.0 (jdk8)
> hive: 2.1.1-cdh6.2.0(jdk8)
>
> flink 1.20.5 (jdk11)
> flink-cdc: 3.6.0 (jdk11)
> hudi: 1.1.1(jdk11)
>
> jobmanager and taskmanger runtime jdk: jdk11
> Reporter: lisa
> Priority: Major
> Labels: pull-request-available
>
> error message:
> ```
> 2026-07-02 15:58:13,538 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> flush_event_alignment -> Map -> multi_table_write -> Flat Map -> Sink Writer:
> Hudi Sink (1/1)
> (e3b354fc809643b605c0cefc06b8ae08_02a786a3af36d73d4b8a95dab13f5e8d_0_2)
> switched from RUNNING to FAILED on container_e17_1767939498468_0254_01_000002
> @ your_machine.com (dataPort=37289).
> org.apache.flink.util.FlinkException: No coordinator registered for operator
> 9899a42c64d67ef3172b7e3be3c1bbb9
> at
> org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:120)
> ~[flink-dist-1.20.5.jar:1.20.5]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1099)
> ~[flink-dist-1.20.5.jar:1.20.5]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:638)
> ~[flink-dist-1.20.5.jar:1.20.5]
> at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
> ~[?:?]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
> ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[flink-dist-1.20.5.jar:1.20.5]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
> ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
> ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
> ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
> ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> [?:?]
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> [?:?]
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> [?:?]
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> [?:?]
> ```
>
> (1) after remove the config "operator.uid.prefix: operator.uid.prefix:
> mysql_hudi_user_behavior" from yaml file the job runs ok ,
> (2) the uid 9899a42c64d67ef3172b7e3be3c1bbb9 would not change, even if i
> restart the job or change the "mysql_hudi_user_behavior" to
> "mysql_hudi_user_behavior1". the uid 9899a42c64d67ef3172b7e3be3c1bbb9 would
> not change at all.
>
> pipeline job file:
>
> ```yaml
> source:
> type: mysql
> hostname: 10.192.100.01
> port: 3306
> username: root
> password: qaz123
> tables: test.user_behavior
> server-id: 5400-5403
> server-time-zone: Asia/Shanghai
> scan.startup.mode: initial
> heartbeat.interval: 30s
> sink:
> type: hudi
> name: Hudi Sink
> path: "hdfs://mycluster/apps/hive/warehouse/managed/dc"
> hoodie.table.type: MERGE_ON_READ
> transform:
> - source-table: test.user_behavior
> table-options:
> ordering.fields: ts
> pipeline:
> name: "MySQL to Hudi Pipeline"
> parallelism: 1
> operator.uid.prefix: mysql_hudi_user_behavior
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)