Flink uses kryo serialization which doesn't support joda time object serialization.
Use java.util.date or you have to change kryo. Thanks, Arpit On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote: > Hi > I have a problem at running a sample code from the hands-in examples of > Apache Flink, > I used the following code to send output of a stream to already running > Apache Kafka, and get the below error. Could anyone tell me what is going > wrong? > > Best regards > Ahmad > > public class RideCleansing { > > private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; > public static final String CLEANSED_RIDES_TOPIC = "mytopic"; > > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > DataStream<TaxiRide> rides = env.addSource(new > TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f)); > > DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter()); > > filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER, > CLEANSED_RIDES_TOPIC, > new TaxiRideSchema())); > > env.execute("Taxi Ride Cleansing"); > } > > Error: > 18:43:15,734 INFO org.apache.flink.api.java.typeutils.TypeExtractor - > class org.joda.time.DateTime is not a valid POJO type > Exception in thread "main" java.lang.NoClassDefFoundError: > kafka/producer/Partitioner > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:760) > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) > at java.net.URLClassLoader.access$100(URLClassLoader.java:73) > at java.net.URLClassLoader$1.run(URLClassLoader.java:367) > at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:360) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51) > Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner > at java.net.URLClassLoader$1.run(URLClassLoader.java:372) > at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:360) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 13 more > >