[ 
https://issues.apache.org/jira/browse/FLINK-40051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18093740#comment-18093740
 ] 

Qiu Yanjun commented on FLINK-40051:
------------------------------------

Created PR: https://github.com/apache/flink-cdc/pull/4460


> erorr: No coordinator registered for operator  when set operator.uid.prefix 
> in mysql to hudi pipeline yaml
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40051
>                 URL: https://issues.apache.org/jira/browse/FLINK-40051
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 1.20.5
>         Environment: hadoop: 3.0.0-cdh6.2.0 (jdk8)
> hive: 2.1.1-cdh6.2.0(jdk8)
>  
> flink 1.20.5 (jdk11)
> flink-cdc: 3.6.0 (jdk11)
> hudi: 1.1.1(jdk11)
>  
> jobmanager and taskmanger runtime jdk: jdk11
>            Reporter: lisa
>            Priority: Major
>              Labels: pull-request-available
>
> error message:
> ```
> 2026-07-02 15:58:13,538 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
> flush_event_alignment -> Map -> multi_table_write -> Flat Map -> Sink Writer: 
> Hudi Sink (1/1) 
> (e3b354fc809643b605c0cefc06b8ae08_02a786a3af36d73d4b8a95dab13f5e8d_0_2) 
> switched from RUNNING to FAILED on container_e17_1767939498468_0254_01_000002 
> @ your_machine.com (dataPort=37289).
> org.apache.flink.util.FlinkException: No coordinator registered for operator 
> 9899a42c64d67ef3172b7e3be3c1bbb9
>     at 
> org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:120)
>  ~[flink-dist-1.20.5.jar:1.20.5]
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1099)
>  ~[flink-dist-1.20.5.jar:1.20.5]
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:638)
>  ~[flink-dist-1.20.5.jar:1.20.5]
>     at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) 
> ~[?:?]
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:?]
>     at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
>  ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>  ~[flink-dist-1.20.5.jar:1.20.5]
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
>  ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
>  ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
>  ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
>  ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>  [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245) 
> [flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
>     at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) 
> [?:?]
>     at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>  [?:?]
>     at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
>     at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) 
> [?:?]
>     at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
>  [?:?]
> ```
>  
> (1) after remove the config "operator.uid.prefix: operator.uid.prefix: 
> mysql_hudi_user_behavior" from yaml file the job runs ok , 
> (2) the uid 9899a42c64d67ef3172b7e3be3c1bbb9 would not change, even if i 
> restart the job or change the "mysql_hudi_user_behavior" to 
> "mysql_hudi_user_behavior1". the uid 9899a42c64d67ef3172b7e3be3c1bbb9 would 
> not change at all.
>  
> pipeline job file:
>  
> ```yaml
> source:
>   type: mysql
>   hostname: 10.192.100.01
>   port: 3306
>   username: root
>   password: qaz123
>   tables: test.user_behavior
>   server-id: 5400-5403
>   server-time-zone: Asia/Shanghai
>   scan.startup.mode: initial
>   heartbeat.interval: 30s
> sink:
>   type: hudi
>   name: Hudi Sink
>   path: "hdfs://mycluster/apps/hive/warehouse/managed/dc"
>   hoodie.table.type: MERGE_ON_READ
> transform:
>   - source-table: test.user_behavior
>     table-options:
>       ordering.fields: ts
> pipeline:
>   name: "MySQL to Hudi Pipeline"
>   parallelism: 1
>   operator.uid.prefix: mysql_hudi_user_behavior
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to