tongyp created FLINK-37975:
------------------------------

             Summary: sync a lot sharding table Post Transform failed
                 Key: FLINK-37975
                 URL: https://issues.apache.org/jira/browse/FLINK-37975
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.4.0
            Reporter: tongyp


env: flinkcdc 3.4 pipeline + flink 1.20.1

i sync about 10000 sharding tables to doris,the task will failed。the jobmanager 
log is

 ```python

2025-06-17 17:41:25,211 INFO  
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator 
[] - The enumerator assigns split MySqlSnapshotSplit\{tableId=xxx, 
splitId='xx', splitKeyType=[`id` BIGINT NOT NULL], splitStart=null, 
splitEnd=null, highWatermark=null} to subtask 0

2025-06-17 17:41:26,070 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
Transform:Data -> SchemaOperator -> PrePartition (1/1) 
(219052134362b9a800a3f2246605fbac_90bea66de1c231edf33913ecd54406c1_0_0) 
switched from RUNNING to FAILED on localhost:44643-f3a1f2 @ localhost 
(dataPort=40666).

org.apache.flink.util.SerializedThrowable: 
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException: 
Failed to post-transform with

CreateTableEvent\{xxx}

for table

xx

from schema

columns={xxx)

to schema

columns={xxx).

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:146)
 ~[?:?]

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) 
~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-dist-1.20.1.jar:1.20.1]

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) 
~[flink-dist-1.20.1.jar:1.20.1]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
~[flink-dist-1.20.1.jar:1.20.1]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist-1.20.1.jar:1.20.1]

at java.lang.Thread.run(Thread.java:829) ~[?:?]

Caused by: org.apache.flink.util.SerializedThrowable: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.20.1.jar:1.20.1]

at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130)
 ~[?:?]

... 13 more

Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalStateException: Failed to send request to coordinator: 
SchemaChangeRequest\{xxx}

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:241)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.requestSchemaChange(SchemaOperator.java:227)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleSchemaChangeEvent(SchemaOperator.java:173)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:148)
 ~[?:?]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.20.1.jar:1.20.1]

at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130)
 ~[?:?]

... 13 more

Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: 
Invocation of 
[RemoteRpcInvocation(JobMasterOperatorEventGateway.sendRequestToCoordinator(OperatorID,
 SerializedValue))] at recipient 
[pekko.tcp://flink@localhost:6123/user/rpc/jobmanager_2] timed out. This is 
usually caused by: 1) Pekko failed sending the message silently, due to 
problems like oversized payload or serialization failures. In that case, you 
should find detailed error information in the logs. 2) The recipient needs more 
time for responding, due to problems like slow machines or network jitters. In 
that case, you can try to increase pekko.ask.timeout.

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2028) 
~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:238)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.requestSchemaChange(SchemaOperator.java:227)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleSchemaChangeEvent(SchemaOperator.java:173)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:148)
 ~[?:?]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.20.1.jar:1.20.1]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.20.1.jar:1.20.1]

at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElementInternal(PostTransformOperator.java:175)
 ~[?:?]

at 
org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:130)
 ~[?:?]

... 13 more

Caused by: org.apache.flink.util.SerializedThrowable: 
java.util.concurrent.TimeoutException: Invocation of 
[RemoteRpcInvocation(JobMasterOperatorEventGateway.sendRequestToCoordinator(OperatorID,
 SerializedValue))] at recipient 
[pekko.tcp://flink@localhost:6123/user/rpc/jobmanager_2] timed out. This is 
usually caused by: 1) Pekko failed sending the message silently, due to 
problems like oversized payload or serialization failures. In that case, you 
should find detailed error information in the logs. 2) The recipient needs more 
time for responding, due to problems like slow machines or network jitters. In 
that case, you can try to increase pekko.ask.timeout.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to