Jiyong Wang created FLINK-40040:
-----------------------------------

             Summary: SchemaRegistry loses coordination responses when a 
failure's cause is a user-classloader-only class (e.g. a JDBC driver 
exception), causing the job to hang until rpcTimeout and enter a restart loop
                 Key: FLINK-40040
                 URL: https://issues.apache.org/jira/browse/FLINK-40040
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
            Reporter: Jiyong Wang


When the schema-evolution coordinator ({{SchemaRegistry}}) completes a 
coordination response exceptionally, the exception crosses the 
operator-coordinator RPC boundary and is deserialized on the {{SchemaOperator}} 
side by {{flink-rpc-akka}}, which uses an *isolated classloader*. If the 
exception's cause chain contains a class that only exists in the user 
classloader, deserialization fails with {{ClassNotFoundException}} and the 
response is silently dropped.

Observed in production with Flink CDC 3.6.0 on Flink 1.20.3:

{noformat}
ERROR org.apache.pekko.remote.Remoting - 
com.mysql.cj.exceptions.ConnectionIsClosedException
java.lang.ClassNotFoundException: 
com.mysql.cj.exceptions.ConnectionIsClosedException
    at org.apache.pekko.util.ClassLoaderObjectInputStream.resolveClass(...)
    at ...MiscMessageSerializer.deserializeStatusFailure(...)
{noformat}

Because the coordination response never arrives, 
{{SchemaOperator.sendRequestToCoordinator}} blocks on 
{{responseFuture.get(rpcTimeout)}} and fails ~3 minutes later with a 
*misleading* {{TimeoutException}}:

{noformat}
IllegalStateException: Failed to send request to coordinator: 
SchemaChangeRequest{...AddColumnEvent...}
Caused by: java.util.concurrent.TimeoutException
    at ...SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:243)
{noformat}

The job then restarts, hits the same condition again, and only a *full job 
restart* recovers -- a transient error is amplified into a permanent restart 
loop, and the real cause is hidden behind a {{TimeoutException}}.

h3. Root cause (confirmed)

{{SchemaRegistry.failJob}} passes the original {{Throwable}} to 
{{completeExceptionally(...)}} and {{context.failJob(...)}}. When that 
throwable (or any element of its cause chain) is a user-classloader-only type, 
it cannot be deserialized by the isolated RPC classloader on the receiving side.

h3. Trigger in this incident (inferred -- JM logs no longer available)

The {{ConnectionIsClosedException}} most likely originated from the 
JobManager-side {{MySqlSourceEnumerator}} during table discovery / chunk 
splitting (the only JM component that holds a MySQL connection), then 
propagated across the coordinator RPC boundary. The exact producing site could 
not be pinned down without JM logs, but the deserialization failure itself is 
fully evidenced by the TaskManager stack above.

h3. Fix

Wrap the failure into {{org.apache.flink.util.SerializedThrowable}} in 
{{SchemaRegistry.failJob}} before it crosses the RPC boundary. 
{{SerializedThrowable}} carries the original exception as bytes plus a 
stringified stack trace, so the receiving side can deserialize it without the 
original class and still see the real cause. {{runInEventLoop}} is routed 
through {{failJob}} so all exit paths share the wrapping; this covers both the 
regular and distributed topologies.

h3. Reproduction

A minimal reproduction (also added as a unit test) serializes 
{{RuntimeException(cause = <class visible only to a child classloader>)}} and 
deserializes it with a classloader that cannot see that class:
* Without the fix: {{ClassNotFoundException}} -> response lost.
* With the fix ({{SerializedThrowable}}): deserializes successfully and 
preserves the real cause as text.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to