Hi Nuno, In general, if it is possible, it is recommended that you map your generic classes to Tuples / POJOs [1]. For Tuples / POJOs, Flink will create specialized serializers for them, whereas for generic classes (i.e. types which cannot be treated as POJOs) Flink simply fallbacks to using Kryo for them. The actual performance gain may depend a bit on what the original generic class type looked like.
One other thing probably to look at is enabling object reuse for de-/serialization. However, be aware that the user code needs to be aware of this, otherwise it may lead to unexpected errors. Cheers, Gordon [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types On 20 June 2017 at 11:24:03 PM, Nuno Rafael Goncalves ( nuno.goncal...@wedotechnologies.com) wrote: I believe there are some performance impact while de/serializing, which is “normal”. What I’m trying to understand is if there are any tips to improve this process. For instance, tuples vs general class types. Do you know if it’s worth it to map a custom object into tuple just for de/serialization process? According to jfr analysis, kryo methods are hit a lot. -----Original Message----- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: 20 de junho de 2017 16:04 To: user@flink.apache.org Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com> Subject: Re: Kafka and Flink integration No, this is only necessary if you want to register a custom serializer itself [1]. Also, in case you are wondering about registerKryoType() - this is only needed as a performance optimisation. What exactly is your problem? What are you trying to solve? (I can't read JFR files here, and from what I read at Oracle's site, this requires a commercial license, too...) Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ custom_serializers.html On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote: > Do I need to use registerTypeWithKryoSerializer() in my execution > environment? > My serialization into kafka is done with the following snippet > > try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream(); > Output output = new Output(byteArrayOutStream)) { > Kryo kryo = new Kryo(); > kryo.writeClassAndObject(output, event); > output.flush(); > return byteArrayOutStream.toByteArray(); > } catch (IOException e) { > return null; > } > > "event" is my custom object. > > then i desirialize it in flink's kafka consumer > try (ByteArrayInputStream byteArrayInStream = new > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, > bytes.length)) { > Kryo kryo = new Kryo(); > return kryo.readClassAndObject(input); > } catch (IOException e) { > return null; > } > > Thanks > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User > Mailing List archive. mailing list archive at Nabble.com.
image003.jpg@01D2E9E1.26D2D370
Description: Binary data