Hi,

I am developing my flink application.
For start I have built a class that reads events from Kafka and outputs
them datastream.print()

The job runs every time.
But starting with the 2nd time I see this in the standalone session log:

2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
               [] - Failed to close KafkaClient with type
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode
        at
org.apache.kafka.common.network.Selector.close(Selector.java:806)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.common.network.Selector.close(Selector.java:365)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.network.Selector$CloseMode
        at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
        at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
        ... 6 more
2022-02-04 15:16:30,802 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
closed.

Am I doing something wrong?

This is basically the gist of the code:

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
        .setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
        .setBounded(OffsetsInitializer.latest())
        .build();

//withIdleness.duration()
//env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),
"Kafka Source");
DataStream<String> ds = env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

ds.print();

Reply via email to