I did test it with Kafka 0.9.0.1, still the problem exists! On Wed, Jun 1, 2016 at 11:50 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> The Flink Kafka Consumer was never tested with Kafka 0.10, could you try > it with 0.9. The 0.10 release is still very new and we have yet to provide > a consumer for that. > > On Wed, 1 Jun 2016 at 10:47 ahmad Sa P <aspp...@gmail.com> wrote: > >> Hi Aljoscha, >> I have tried different version of Flink V 1.0.0 and 1.0.3 and Kafka >> version 0.10.0.0. >> Ahmad >> >> >> >> On Wed, Jun 1, 2016 at 10:39 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> This is unrelated to joda time or Kryo, that's just an info message in >>> the log. >>> >>> What version of Flink and Kafka are you using? >>> >>> >>> >>> On Wed, 1 Jun 2016 at 07:02 arpit srivastava <arpit8...@gmail.com> >>> wrote: >>> >>>> Flink uses kryo serialization which doesn't support joda time object >>>> serialization. >>>> >>>> Use java.util.date or you have to change kryo. >>>> >>>> Thanks, >>>> Arpit >>>> >>>> On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote: >>>> >>>>> Hi >>>>> I have a problem at running a sample code from the hands-in examples >>>>> of Apache Flink, >>>>> I used the following code to send output of a stream to already >>>>> running Apache Kafka, and get the below error. Could anyone tell me what >>>>> is >>>>> going wrong? >>>>> >>>>> Best regards >>>>> Ahmad >>>>> >>>>> public class RideCleansing { >>>>> >>>>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; >>>>> public static final String CLEANSED_RIDES_TOPIC = "mytopic"; >>>>> >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> DataStream<TaxiRide> rides = env.addSource(new >>>>> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f)); >>>>> >>>>> DataStream<TaxiRide> filteredRides = rides.filter(new >>>>> NYCFilter()); >>>>> >>>>> filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER, >>>>> CLEANSED_RIDES_TOPIC, >>>>> new TaxiRideSchema())); >>>>> >>>>> env.execute("Taxi Ride Cleansing"); >>>>> } >>>>> >>>>> Error: >>>>> 18:43:15,734 INFO org.apache.flink.api.java.typeutils.TypeExtractor >>>>> - class org.joda.time.DateTime is not a valid POJO type >>>>> Exception in thread "main" java.lang.NoClassDefFoundError: >>>>> kafka/producer/Partitioner >>>>> at java.lang.ClassLoader.defineClass1(Native Method) >>>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:760) >>>>> at >>>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >>>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) >>>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73) >>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:367) >>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> at >>>>> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51) >>>>> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner >>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372) >>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> ... 13 more >>>>> >>>>> >>>> >>