Hello Team, I'm new to Flink and writing a Flink job that will take data from Kafka and sink it to InfluxDB. I tried using the concept this guy is using https://github.com/apache/bahir-flink/blob/master/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
package com.dataartisans; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig; import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint; import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.json.JSONObject; import java.util.*; import java.util.concurrent.TimeUnit; public class ReadFromKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = ParameterTool.fromArgs(args); DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); DataStream<InfluxDBPoint> formatStream = messageStream.rebalance().map(new MapFunction<String, InfluxDBPoint>() { private static final long serialVersionUID = -6867736771747690202L; @Override public InfluxDBPoint map(String value) throws Exception { JSONObject jsonObj = new JSONObject(value); HashMap<String, String> tags = new HashMap<>(); tags.put("source","kafka"); tags.put("sink","InfluxDB"); HashMap<String, Object> fields = new HashMap<>(); fields.put("first_name", jsonObj.getString("first_name")); fields.put("last_name", jsonObj.getString("last_name")); return new InfluxDBPoint("influxConnect", System.currentTimeMillis(),tags, fields); } }); InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test") .batchActions(1000) .flushDuration(100, TimeUnit.MILLISECONDS) .enableGzip(true) .build(); formatStream.addSink(new InfluxDBSink(influxDBConfig)); env.execute("InfluxDB Sink Example"); } } this is throwing error: 12:41:51,364 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker localhost:9092 in try 0/3 12:41:51,876 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Topic flinkkafka has 1 partitions 12:41:51,928 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint must have a default constructor to be used as a POJO. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/util/Preconditions at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(InfluxDBConfig.java:44) at org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(InfluxDBConfig.java:221) at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67) Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.Preconditions at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 3 more Can someone please help me to get out of this problem. Thanks, Abhijeet Kumar Software Development Engineer, Sentienz Solutions Pvt Ltd Cognitive Data Platform - Perceive the Data ! abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru