Have you set anything beyond the defaults in the Flink configuration?
This could just be noise with some Kafka stuff running in the background
while Flink is shutting things down (and closing the classloader).
On 04/02/2022 15:29, HG wrote:
Hi,
I am developing my flink application.
For start I have built a class that reads events from Kafka and
outputs them datastream.print()
The job runs every time.
But starting with the 2nd time I see this in the standalone session log:
2022-02-04 15:16:30,801 WARN org.apache.kafka.common.utils.Utils []
- Failed to close KafkaClient with type
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode
at
org.apache.kafka.common.network.Selector.close(Selector.java:806)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.common.network.Selector.close(Selector.java:365)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.network.Selector$CloseMode
at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
... 6 more
2022-02-04 15:16:30,802 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
Source coordinator for source Source: Kafka Source -> Sink: Print to
Std. Out closed.
Am I doing something wrong?
This is basically the gist of the code:
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
//withIdleness.duration() //env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");
ds.print();