Jan Gurda created FLINK-35542:
---------------------------------

             Summary: ClassNotFoundException when deserializing 
CheckpointedOffset
                 Key: FLINK-35542
                 URL: https://issues.apache.org/jira/browse/FLINK-35542
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: jdbc-3.1.2
         Environment: Flink 1.19.0

Flink JDBC Connector 3.2-SNAPSHOT (commit 
2defbbcf4fc550a76dd9c664e1eed7d261e028ca)

JDK 11 (Temurin)
            Reporter: Jan Gurda
             Fix For: jdbc-3.2.0


I use the latest flink-connector-jdbc code from the main branch, it's actually 
3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).

 

When jobs get interrupted while reading data from the JDBC source (for example, 
by the TaskManager outage), they cannot recover due to the following exception:
{code:java}
java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
    at 
org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
    at 
org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown 
Source)
    at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown 
Source)
    at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    at java.base/java.lang.Class.forName0(Native Method)
    at java.base/java.lang.Class.forName(Unknown Source)
    at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
    at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
    at 
org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
    ... 22 more {code}
 

In our deployment, we embed the JDBC connector classes into the job JAR file. 
It means that the class 
org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible only 
for the _FlinkUserCodeClassLoader_ and not for the _AppClassLoader._ I believe 
the problem is in the following code snippet, where we use the class loader of 
the JDK's 
_DataInputStream_ class:
{code:java}
public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
        throws IOException, ClassNotFoundException {
    // ....
    // Some lines skipped 
    CheckpointedOffset chkOffset =
            InstantiationUtil.deserializeObject(chkOffsetBytes, 
in.getClass().getClassLoader());

    return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
} {code}
If I change it to the following:
{code:java}
public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
        throws IOException, ClassNotFoundException {
    // .... 
    // Some lines skipped
    CheckpointedOffset chkOffset =
            InstantiationUtil.deserializeObject(chkOffsetBytes, 
CheckpointedOffset.class.getClassLoader());

    return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
} {code}
Everything works as expected.
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to