[ https://issues.apache.org/jira/browse/FLINK-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17048804#comment-17048804 ]
Guowei Ma commented on FLINK-16262: ----------------------------------- Thanks for [~gjy]'s explanation. This also reminds one thing. Currently, in the Yarn/Mesos per job the user class loader is not enabled by default. I think maybe we should keep the same behavior in per-job clusters. For example we could provide a arguments –with-usrlib to build.sh. Only if user give this parameter to build.sh we should copy the user jar to usrlib/ directory. What do you think [~gjy]? > Class loader problem with FlinkKafkaProducer.Semantic.EXACTLY_ONCE and usrlib > directory > --------------------------------------------------------------------------------------- > > Key: FLINK-16262 > URL: https://issues.apache.org/jira/browse/FLINK-16262 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.10.0 > Environment: openjdk:11-jre with a slightly modified Flink 1.10.0 > build (nothing changed regarding Kafka and/or class loading). > Reporter: Jürgen Kreileder > Assignee: Guowei Ma > Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > We're using Docker images modeled after > [https://github.com/apache/flink/blob/master/flink-container/docker/Dockerfile] > (using Java 11) > When I try to switch a Kafka producer from AT_LEAST_ONCE to EXACTLY_ONCE, the > taskmanager startup fails with: > {code:java} > 2020-02-24 18:25:16.389 INFO o.a.f.r.t.Task Create > Case Fixer -> Sink: Findings local-krei04-kba-digitalweb-uc1 (1/1) > (72f7764c6f6c614e5355562ed3d27209) switched from RUNNING to FAILED. > org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:718) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:471) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:396) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) > at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown > Source) > at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown > Source) > at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) > at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) > at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown > Source){code} > This looks like a class loading issue: If I copy our JAR to FLINK_LIB_DIR > instead of FLINK_USR_LIB_DIR, everything works fine. > (AT_LEAST_ONCE producers works fine with the JAR in FLINK_USR_LIB_DIR) > -- This message was sent by Atlassian Jira (v8.3.4#803005)