You need to initialize the decoder in the deserialize method instead
of in the constructor.
On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota
wrote:
> Hi Max/Ewen,
>
> Thank you for the inputs. I was able to solve the serialization issues. Now
> i am seeing the NullPoint Exceptions.
>
> public clas
The constructor of Java classes after deserialization is not necessarily
called. Thus, you should move the check
if (this.decoder == null) {
this.decoder = new KafkaAvroDecoder(vProps);
}
into the deserialize method of MyAvroDeserializer.
Cheers,
Till
On Thu, Nov 19, 2015 at 1:50 PM, Madh
Hi Max/Ewen,
Thank you for the inputs. I was able to solve the serialization issues. Now
i am seeing the NullPoint Exceptions.
public class MyAvroDeserializer implements DeserializationSchema {
private transient KafkaAvroDecoder decoder;
public MyAvroDeserializer(VerifiableProperties vP
Stephan is right, this should do it in deserialize():
if (decoder == null) {
decoder = new KafkaAvroDecoder(vProps);
}
Further, you might have to specify the correct return type for
getProducedType(). You may use
public TypeInformation getProducedType() {
return TypeE
The KafkaAvroDecoder is not serializable, and Flink uses serialization to
distribute the code to the TaskManagers in the cluster.
I think you need to "lazily" initialize the decoder, in the first
invocation of "deserialize()". That should do it.
Stephan
On Thu, Nov 19, 2015 at 12:10 PM, Madhuka
Hi Max
Thanks for the example.
Based on your example here is what i did:
public class Streamingkafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500)
Hi Madhukar,
Thanks for your question. When you instantiate the FlinkKafkaConsumer, you
supply a DeserializationSchema in the constructor. You simply create a
class which implements DeserializationSchema and contains the
KafkaAvroDecoder with the schema registry.
Like so:
public class MyAvroDese
Hi
I am very new to Avro. Currently I am using confluent Kafka version and I
am able to write an Avro message to Kafka by storing schema in schema
registry. Now I need to consume those messages using Flink Kafka Consumer
and having a hard time to deserialize the messages.
I am looking for an exam