[
https://issues.apache.org/jira/browse/KAFKA-19758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mickael Maison resolved KAFKA-19758.
------------------------------------
Resolution: Fixed
> Weird behavior on Kafka Connect 4.1 class loading
> -------------------------------------------------
>
> Key: KAFKA-19758
> URL: https://issues.apache.org/jira/browse/KAFKA-19758
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 4.1.0
> Reporter: Mario Fiore Vitale
> Assignee: Mickael Maison
> Priority: Blocker
> Fix For: 4.2.0, 4.1.1
>
> Attachments: connect-service.log
>
>
> I have the
> [DebeziumOpenLineageEmitter|https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java]
> class in the *debezium-openlineage-api* that internally has a static map to
> maintain the registered emitter, the key of this map is
> "connectoLogicalName-taskid"
> Then there is the [OpenLineage
> SMT|https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java],
> which is part of the *debezium-core.* In this SMT, I simply pass the same
> context to instantiate the same emitter via the connector.
> Now I'm running the following image
> {code:java}
> FROM quay.io/debezium/connect:3.3.0.Final
> ENV MAVEN_REPO="https://repo1.maven.org/maven2"
> ENV GROUP_ID="io/debezium"
> ENV DEBEZIUM_VERSION="3.3.0.Final"
> ENV ARTIFACT_ID="debezium-openlineage-core"
> ENV CLASSIFIER="-libs"
> COPY log4j.properties /kafka/config/log4j.properties
> Add OpenLineage
> RUN mkdir -p /tmp/openlineage-libs && \
> curl
> "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
> -o /tmp/debezium-openlineage-core-libs.tar.gz && \
> tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C
> /tmp/openlineage-libs --strip-components=1
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-postgres/
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-mongodb/
> ADD openlineage.yml /kafka/ {code}
> So is practically debezium connect image with just openlineage jars copied
> into postgres and mongodb connector folders.
> When I register the PostgreSQL connector
> {code:java}
> {
> "name": "inventory-connector-postgres",
> "config": {
> "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
> "tasks.max": "1",
> "database.hostname": "postgres",
> "database.port": "5432",
> "database.user": "postgres",
> "database.password": "postgres",
> "database.server.id": "184054",
> "database.dbname": "postgres",
> "topic.prefix": "inventory",
> "snapshot.mode": "initial",
> "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
> "schema.history.internal.kafka.topic": "schema-changes.inventory",
> "slot.name": "postgres",
> "openlineage.integration.enabled": "true",
> "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
> "openlineage.integration.job.description": "This connector does cdc for
> products",
> "openlineage.integration.tags": "env=prod,team=cdc",
> "openlineage.integration.owners": "Mario=maintainer,John Doe=Data
> scientist,IronMan=superero",
> "transforms": "openlineage",
> "transforms.openlineage.type":
> "io.debezium.transforms.openlineage.OpenLineage"
> }
> } {code}
>
> I get the following error
> {code:java}
> 2025-10-03T14:22:09,761 ERROR ||
> WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught
> and unrecoverable exception. Task is being killed and will not recover until
> manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error
> handler
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
> ~[connect-runtime-4.1.0.jar:?]
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243)
> ~[connect-runtime-4.1.0.jar:?]
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
> ~[connect-runtime-4.1.0.jar:?]
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> ~[?:?]
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
> ~[?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[?:?]
> at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not
> initialized for connector ConnectorContext[connectorLogicalName=inventory,
> connectorName=postgresql, taskId=0, version=null, config=null]. Call init()
> first.
> at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
> at
> io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
> ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
> at
> io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74)
> ~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
> at
> org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
> ~[connect-runtime-4.1.0.jar:?]
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
> ~[connect-runtime-4.1.0.jar:?]
> ... 13 more {code}
> Full logs [^connect-service.log]
>
> This is evidence that the emitters map is not shared between the connector
> and the SMT.
> The situation becomes weirder if I remove all connectors from the image
> except PostgreSQL and MongoDB.
> In that case, the PostgreSQL connector works perfectly.
> The plugins are in the folder */kafka/connect* (that is, the only
> `plugin.path` configured folder), each under a dedicated folder with their
> dependencies.
> I then started to add more connectors, and it continued to work until I added
> the SQL Server connector.
> To summarize, the problem arises when I put one or all of [sqlserver,
> spanner,vitess].
>
> The commonality for these connectors seems to be that they support
> multi-task. The others don't.
> Am I correct that Kafka Connect guarantees that each connector is loaded with
> an isolated class loader with its dependencies so that the static emitters
> should be shared between the Connector and the SMT?
> To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all
> connectors, it works fine.
> I did other tests, and things are more and more weird. All tests were done
> with *{{plugin.path=/kafka/connect}}* and *KC 4.1*
> My original tests were with this directory structure
>
> {code:java}
> /kafka/connect
> |___ debezium-connector-postgres
> |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>
> In this case, each connector should be isolated from each others (having a
> dedicated class loader). In that case, the sharing between the connector and
> SMT does not work for KC 4.0
> Then I tried with
>
> {code:java}
> /kafka/connect
> |___ debezium-connectors
> |___ debezium-connector-postgres
> |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>
> So all connectors are not isolated and share the same class loader. In this
> case, no issue. And I'll say that this is expected.
> Then I tried with
>
> {code:java}
> /kafka/connect
> |___ debezium-connectors
> | |___ debezium-connector-postgres
> | |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>
> where *{{postgres}}* and *{{mongodb}}* are not isolated (same classloader)
> and *{{sqlserver}}* is isolated (different classloader), and in this case, it
> still works. I expected this to fail as with the first setup.
> The SMT is in the *debezium-core* jar that and each connector has its own copy
> So in each connector folder, there are:
> {code:java}
> debezium-api-3.3.0.Final.jar
> debezium-common-3.3.0.Final.jar
> debezium-connector-[connectorName]-3.3.0.Final.jar
> debezium-core-3.3.0.Final.jar
> debezium-openlineage-api-3.3.0.Final.jar{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)