I run it in Eclipse IDE, On Wed, Jun 1, 2016 at 12:37 PM, Ashutosh Kumar <kmr.ashutos...@gmail.com> wrote:
> How are you packaging and deploying your jar ? I have tested with flink > and kafka .9 . It works fine for me . > > Thanks > Ashutosh > > On Wed, Jun 1, 2016 at 3:37 PM, ahmad Sa P <aspp...@gmail.com> wrote: > >> I did test it with Kafka 0.9.0.1, still the problem exists! >> >> On Wed, Jun 1, 2016 at 11:50 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> The Flink Kafka Consumer was never tested with Kafka 0.10, could you try >>> it with 0.9. The 0.10 release is still very new and we have yet to provide >>> a consumer for that. >>> >>> On Wed, 1 Jun 2016 at 10:47 ahmad Sa P <aspp...@gmail.com> wrote: >>> >>>> Hi Aljoscha, >>>> I have tried different version of Flink V 1.0.0 and 1.0.3 and Kafka >>>> version 0.10.0.0. >>>> Ahmad >>>> >>>> >>>> >>>> On Wed, Jun 1, 2016 at 10:39 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> This is unrelated to joda time or Kryo, that's just an info message in >>>>> the log. >>>>> >>>>> What version of Flink and Kafka are you using? >>>>> >>>>> >>>>> >>>>> On Wed, 1 Jun 2016 at 07:02 arpit srivastava <arpit8...@gmail.com> >>>>> wrote: >>>>> >>>>>> Flink uses kryo serialization which doesn't support joda time object >>>>>> serialization. >>>>>> >>>>>> Use java.util.date or you have to change kryo. >>>>>> >>>>>> Thanks, >>>>>> Arpit >>>>>> >>>>>> On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi >>>>>>> I have a problem at running a sample code from the hands-in examples >>>>>>> of Apache Flink, >>>>>>> I used the following code to send output of a stream to already >>>>>>> running Apache Kafka, and get the below error. Could anyone tell me >>>>>>> what is >>>>>>> going wrong? >>>>>>> >>>>>>> Best regards >>>>>>> Ahmad >>>>>>> >>>>>>> public class RideCleansing { >>>>>>> >>>>>>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; >>>>>>> public static final String CLEANSED_RIDES_TOPIC = "mytopic"; >>>>>>> >>>>>>> >>>>>>> public static void main(String[] args) throws Exception { >>>>>>> >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> >>>>>>> DataStream<TaxiRide> rides = env.addSource(new >>>>>>> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f)); >>>>>>> >>>>>>> DataStream<TaxiRide> filteredRides = rides.filter(new >>>>>>> NYCFilter()); >>>>>>> >>>>>>> filteredRides.addSink(new >>>>>>> FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER, >>>>>>> CLEANSED_RIDES_TOPIC, >>>>>>> new TaxiRideSchema())); >>>>>>> >>>>>>> env.execute("Taxi Ride Cleansing"); >>>>>>> } >>>>>>> >>>>>>> Error: >>>>>>> 18:43:15,734 INFO org.apache.flink.api.java.typeutils.TypeExtractor >>>>>>> - class org.joda.time.DateTime is not a valid POJO type >>>>>>> Exception in thread "main" java.lang.NoClassDefFoundError: >>>>>>> kafka/producer/Partitioner >>>>>>> at java.lang.ClassLoader.defineClass1(Native Method) >>>>>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:760) >>>>>>> at >>>>>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >>>>>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) >>>>>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73) >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:367) >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> at >>>>>>> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51) >>>>>>> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372) >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> ... 13 more >>>>>>> >>>>>>> >>>>>> >>>> >> >