[ 
https://issues.apache.org/jira/browse/KAFKA-19758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-19758.
------------------------------------
    Resolution: Fixed

> Weird behavior on Kafka Connect 4.1 class loading
> -------------------------------------------------
>
>                 Key: KAFKA-19758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19758
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>    Affects Versions: 4.1.0
>            Reporter: Mario Fiore Vitale
>            Assignee: Mickael Maison
>            Priority: Blocker
>             Fix For: 4.2.0, 4.1.1
>
>         Attachments: connect-service.log
>
>
> I have the 
> [DebeziumOpenLineageEmitter|https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java]
>  class in the *debezium-openlineage-api* that internally has a static map to 
> maintain the registered emitter, the key of this map is 
> "connectoLogicalName-taskid"
> Then there is the [OpenLineage 
> SMT|https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java],
>  which is part of the *debezium-core.* In this SMT, I simply pass the same 
> context to instantiate the same emitter via the connector.
> Now I'm running the following image
> {code:java}
> FROM quay.io/debezium/connect:3.3.0.Final
> ENV MAVEN_REPO="https://repo1.maven.org/maven2";
> ENV GROUP_ID="io/debezium"
> ENV DEBEZIUM_VERSION="3.3.0.Final"
> ENV ARTIFACT_ID="debezium-openlineage-core"
> ENV CLASSIFIER="-libs"
> COPY log4j.properties /kafka/config/log4j.properties
> Add OpenLineage
> RUN mkdir -p /tmp/openlineage-libs && \
>     curl 
> "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
>  -o /tmp/debezium-openlineage-core-libs.tar.gz && \
>     tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C 
> /tmp/openlineage-libs --strip-components=1
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-postgres/
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-mongodb/
> ADD openlineage.yml /kafka/ {code}
> So is practically debezium connect image with just openlineage jars copied 
> into postgres and mongodb connector folders.
> When I register the PostgreSQL connector
> {code:java}
> {
>   "name": "inventory-connector-postgres",
>   "config": {
>     "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
>     "tasks.max": "1",
>     "database.hostname": "postgres",
>     "database.port": "5432",
>     "database.user": "postgres",
>     "database.password": "postgres",
>     "database.server.id": "184054",
>     "database.dbname": "postgres",
>     "topic.prefix": "inventory",
>     "snapshot.mode": "initial",
>     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
>     "schema.history.internal.kafka.topic": "schema-changes.inventory",
>     "slot.name": "postgres",
>     "openlineage.integration.enabled": "true",
>     "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
>     "openlineage.integration.job.description": "This connector does cdc for 
> products",
>     "openlineage.integration.tags": "env=prod,team=cdc",
>     "openlineage.integration.owners": "Mario=maintainer,John Doe=Data 
> scientist,IronMan=superero",
>     "transforms": "openlineage",
>     "transforms.openlineage.type": 
> "io.debezium.transforms.openlineage.OpenLineage"
>   }
> } {code}
>  
> I get the following error
> {code:java}
> 2025-10-03T14:22:09,761 ERROR  ||  
> WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught 
> and unrecoverable exception. Task is being killed and will not recover until 
> manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
>  ~[connect-runtime-4.1.0.jar:?]
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) 
> ~[connect-runtime-4.1.0.jar:?]
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) 
> ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
>  ~[?:?]
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) 
> ~[?:?]
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>  ~[?:?]
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>  ~[?:?]
>     at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not 
> initialized for connector ConnectorContext[connectorLogicalName=inventory, 
> connectorName=postgresql, taskId=0, version=null, config=null]. Call init() 
> first.
>     at 
> io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
>  ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at 
> io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
>  ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at 
> io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74) 
> ~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
>     at 
> org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
>  ~[connect-runtime-4.1.0.jar:?]
>     ... 13 more {code}
> Full logs [^connect-service.log]
>  
> This is evidence that the emitters map is not shared between the connector 
> and the SMT.
> The situation becomes weirder if I remove all connectors from the image 
> except PostgreSQL and MongoDB.
> In that case, the PostgreSQL connector works perfectly.
> The plugins are in the folder */kafka/connect* (that is, the only 
> `plugin.path` configured folder), each under a dedicated folder with their 
> dependencies. 
> I then started to add more connectors, and it continued to work until I added 
> the SQL Server connector.
> To summarize, the problem arises when I put one or all of [sqlserver, 
> spanner,vitess].
>  
> The commonality for these connectors seems to be that they support 
> multi-task. The others don't. 
> Am I correct that Kafka Connect guarantees that each connector is loaded with 
> an isolated class loader with its dependencies so that the static emitters 
> should be shared between the Connector and the SMT?
> To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all 
> connectors, it works fine.
> I did other tests, and things are more and more weird. All tests were done 
> with *{{plugin.path=/kafka/connect}}* and *KC 4.1*
> My original tests were with this directory structure
>  
> {code:java}
> /kafka/connect
> |___ debezium-connector-postgres
> |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>  
> In this case, each connector should be isolated from each others (having a 
> dedicated class loader). In that case, the sharing between the connector and 
> SMT does not work for KC 4.0
> Then I tried with
>  
> {code:java}
> /kafka/connect
> |___ debezium-connectors
>      |___ debezium-connector-postgres
>      |___ debezium-connector-mongodb
>      |___ debezium-connector-sqlserver{code}
>  
> So all connectors are not isolated and share the same class loader. In this 
> case, no issue. And I'll say that this is expected.
> Then I tried with
>  
> {code:java}
> /kafka/connect
> |___ debezium-connectors
> |    |___ debezium-connector-postgres
> |    |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>  
> where *{{postgres}}* and *{{mongodb}}* are not isolated (same classloader) 
> and *{{sqlserver}}* is isolated (different classloader), and in this case, it 
> still works. I expected this to fail as with the first setup.
> The SMT is in the *debezium-core* jar that and each connector has its own copy
> So in each connector folder, there are:
> {code:java}
> debezium-api-3.3.0.Final.jar
> debezium-common-3.3.0.Final.jar
> debezium-connector-[connectorName]-3.3.0.Final.jar
> debezium-core-3.3.0.Final.jar
> debezium-openlineage-api-3.3.0.Final.jar{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to