Have you set anything beyond the defaults in the Flink configuration?

This could just be noise with some Kafka stuff running in the background while Flink is shutting things down (and closing the classloader).

On 04/02/2022 15:29, HG wrote:
Hi,

I am developing my flink application.
For start I have built a class that reads events from Kafka and outputs them datastream.print()

The job runs every time.
But starting with the 2nd time I see this in the standalone session log:

2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils  [] - Failed to close KafkaClient with type org.apache.kafka.clients.NetworkClient java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode         at org.apache.kafka.common.network.Selector.close(Selector.java:806) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]         at org.apache.kafka.common.network.Selector.close(Selector.java:365) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]         at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]         at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]         at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.Selector$CloseMode         at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.14.2.jar:1.14.2]         at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2]         at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
        ... 6 more
2022-02-04 15:16:30,802 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka Source -> Sink: Print to Std. Out closed.

Am I doing something wrong?

This is basically the gist of the code:

KafkaSource<String> source = KafkaSource
         .<String>builder()
         .setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
         .setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)) .setStartingOffsets(OffsetsInitializer.earliest())
         .setBounded(OffsetsInitializer.latest())
         .build();

//withIdleness.duration() //env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");

ds.print();

Reply via email to