The Flink Kafka Consumer was never tested with Kafka 0.10, could you try it
with 0.9. The 0.10 release is still very new and we have yet to provide a
consumer for that.

On Wed, 1 Jun 2016 at 10:47 ahmad Sa P <aspp...@gmail.com> wrote:

> Hi Aljoscha,
> I have tried different version of Flink  V 1.0.0 and 1.0.3 and Kafka
> version 0.10.0.0.
> Ahmad
>
>
>
> On Wed, Jun 1, 2016 at 10:39 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> This is unrelated to joda time or Kryo, that's just an info message in
>> the log.
>>
>> What version of Flink and Kafka are you using?
>>
>>
>>
>> On Wed, 1 Jun 2016 at 07:02 arpit srivastava <arpit8...@gmail.com> wrote:
>>
>>> Flink uses kryo serialization which doesn't support joda time object
>>> serialization.
>>>
>>> Use java.util.date or you have to change kryo.
>>>
>>> Thanks,
>>> Arpit
>>>
>>> On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote:
>>>
>>>> Hi
>>>> I have a problem at running a sample code from the hands-in examples of
>>>> Apache Flink,
>>>> I used the  following code to send output of a stream to already
>>>> running Apache Kafka, and get the below error. Could anyone tell me what is
>>>> going wrong?
>>>>
>>>> Best regards
>>>> Ahmad
>>>>
>>>> public class RideCleansing {
>>>>
>>>>     private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
>>>>     public static final String CLEANSED_RIDES_TOPIC = "mytopic";
>>>>
>>>>
>>>>     public static void main(String[] args) throws Exception {
>>>>
>>>>         StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>         DataStream<TaxiRide> rides = env.addSource(new 
>>>> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f));
>>>>
>>>>         DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter());
>>>>
>>>>         filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER,
>>>>                 CLEANSED_RIDES_TOPIC,
>>>>                 new TaxiRideSchema()));
>>>>
>>>>         env.execute("Taxi Ride Cleansing");
>>>>     }
>>>>
>>>> Error:
>>>> 18:43:15,734 INFO  org.apache.flink.api.java.typeutils.TypeExtractor       
>>>>  - class org.joda.time.DateTime is not a valid POJO type
>>>> Exception in thread "main" java.lang.NoClassDefFoundError: 
>>>> kafka/producer/Partitioner
>>>>     at java.lang.ClassLoader.defineClass1(Native Method)
>>>>     at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
>>>>     at 
>>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>>     at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
>>>>     at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>     at 
>>>> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51)
>>>> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>     ... 13 more
>>>>
>>>>
>>>
>

Reply via email to