Hi, I am developing my flink application. For start I have built a class that reads events from Kafka and outputs them datastream.print()
The job runs every time. But starting with the 2nd time I see this in the standalone session log: 2022-02-04 15:16:30,801 WARN org.apache.kafka.common.utils.Utils [] - Failed to close KafkaClient with type org.apache.kafka.clients.NetworkClient java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode at org.apache.kafka.common.network.Selector.close(Selector.java:806) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.common.network.Selector.close(Selector.java:365) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.Selector$CloseMode at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?] at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?] ... 6 more 2022-02-04 15:16:30,802 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka Source -> Sink: Print to Std. Out closed. Am I doing something wrong? This is basically the gist of the code: KafkaSource<String> source = KafkaSource .<String>builder() .setBootstrapServers(brokers) .setGroupId(groupId) .setTopics(kafkaInputTopic) .setValueOnlyDeserializer(new SimpleStringSchema()) //.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) .build(); //withIdleness.duration() //env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); ds.print();