The Flink Kafka Consumer was never tested with Kafka 0.10, could you try it with 0.9. The 0.10 release is still very new and we have yet to provide a consumer for that.
On Wed, 1 Jun 2016 at 10:47 ahmad Sa P <aspp...@gmail.com> wrote: > Hi Aljoscha, > I have tried different version of Flink V 1.0.0 and 1.0.3 and Kafka > version 0.10.0.0. > Ahmad > > > > On Wed, Jun 1, 2016 at 10:39 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> This is unrelated to joda time or Kryo, that's just an info message in >> the log. >> >> What version of Flink and Kafka are you using? >> >> >> >> On Wed, 1 Jun 2016 at 07:02 arpit srivastava <arpit8...@gmail.com> wrote: >> >>> Flink uses kryo serialization which doesn't support joda time object >>> serialization. >>> >>> Use java.util.date or you have to change kryo. >>> >>> Thanks, >>> Arpit >>> >>> On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote: >>> >>>> Hi >>>> I have a problem at running a sample code from the hands-in examples of >>>> Apache Flink, >>>> I used the following code to send output of a stream to already >>>> running Apache Kafka, and get the below error. Could anyone tell me what is >>>> going wrong? >>>> >>>> Best regards >>>> Ahmad >>>> >>>> public class RideCleansing { >>>> >>>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; >>>> public static final String CLEANSED_RIDES_TOPIC = "mytopic"; >>>> >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> DataStream<TaxiRide> rides = env.addSource(new >>>> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f)); >>>> >>>> DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter()); >>>> >>>> filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER, >>>> CLEANSED_RIDES_TOPIC, >>>> new TaxiRideSchema())); >>>> >>>> env.execute("Taxi Ride Cleansing"); >>>> } >>>> >>>> Error: >>>> 18:43:15,734 INFO org.apache.flink.api.java.typeutils.TypeExtractor >>>> - class org.joda.time.DateTime is not a valid POJO type >>>> Exception in thread "main" java.lang.NoClassDefFoundError: >>>> kafka/producer/Partitioner >>>> at java.lang.ClassLoader.defineClass1(Native Method) >>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:760) >>>> at >>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) >>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73) >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:367) >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> at >>>> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51) >>>> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372) >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> ... 13 more >>>> >>>> >>> >