Hello Team,

I'm new to Flink and writing a Flink job that will take data from Kafka and 
sink it to InfluxDB. I tried using the concept this guy is using
 
https://github.com/apache/bahir-flink/blob/master/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java

package com.dataartisans;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class ReadFromKafka {

        public static void main(String[] args) throws Exception {

                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

                ParameterTool parameterTool = ParameterTool.fromArgs(args);
                
                DataStream<String> messageStream = env.addSource(new 
FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new 
SimpleStringSchema(), parameterTool.getProperties()));
                
                DataStream<InfluxDBPoint> formatStream = 
messageStream.rebalance().map(new MapFunction<String, InfluxDBPoint>() {
                        private static final long serialVersionUID = 
-6867736771747690202L;

                        @Override
                        public InfluxDBPoint map(String value) throws Exception 
{
                                JSONObject jsonObj = new JSONObject(value);
                                
                                HashMap<String, String> tags = new HashMap<>();
                                tags.put("source","kafka");
                                tags.put("sink","InfluxDB");
                                
                                HashMap<String, Object> fields = new 
HashMap<>();
                fields.put("first_name", jsonObj.getString("first_name"));
                fields.put("last_name", jsonObj.getString("last_name"));
                
                                return new InfluxDBPoint("influxConnect", 
System.currentTimeMillis(),tags, fields);
                        }
                });
                
                 InfluxDBConfig influxDBConfig = 
InfluxDBConfig.builder("http://localhost:8086";, "root", "root", "db_flink_test")
                        .batchActions(1000)
                        .flushDuration(100, TimeUnit.MILLISECONDS)
                        .enableGzip(true)
                        .build();
                
                formatStream.addSink(new InfluxDBSink(influxDBConfig));

                env.execute("InfluxDB Sink Example");
        }
}

this is throwing error:

12:41:51,364 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Trying to get 
topic metadata from broker localhost:9092 in try 0/3
12:41:51,876 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer  - Topic 
flinkkafka has 1 partitions
12:41:51,928 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint must have 
a default constructor to be used as a POJO.
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/util/Preconditions
        at 
org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig.<init>(InfluxDBConfig.java:44)
        at 
org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig$Builder.build(InfluxDBConfig.java:221)
        at com.dataartisans.ReadFromKafka.main(ReadFromKafka.java:67)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.util.Preconditions
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 3 more

Can someone please help me to get out of this problem.

Thanks,

        
Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> 
|www.sentienz.com <http://www.sentienz.com/> | Bengaluru


Reply via email to