Hi Oran,

as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, ...


On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster <oran.shus...@houzz.com>
wrote:

> In the documentation we have an example on how to implement
> deserialization from bytes to Jackson ObjectNode objects
> - JSONKeyValueDeserializationSchema
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
>
> However, there is no example on the other direction: Taking an
> ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
> it to string
>
> You can write a simple schema like so
>
>
> public class JSONKafkaSerializationSchema implements
> KafkaSerializationSchema<JsonNode> {
>     private final ObjectMapper objectMapper = new ObjectMapper();
>
>     @Override
>     public ProducerRecord<byte[], byte[]> serialize(JsonNode element,
> @Nullable Long timestamp) {
>         String topic = getTargetTopic(element);
>
>         byte[] value;
>
>         try {
>             value = objectMapper.writeValueAsBytes(element);
>             return new ProducerRecord<>(topic, value);
>         } catch (JsonProcessingException e) {
>             return null;
>         }
>     }
>
>     private String getTargetTopic(JsonNode jsonNode) {
>         return jsonNode.get("topic").asText();
>     }
> }
>
> But this raises a question - What to do when a serialization fails?
> if the input class is a simple POJO then Jackson should always succeed in
> converting to bytes but that's not 100% guaranteed.
> In case of failures, can we return null and the record will be discarded?
> Null values are discarded in the case of the deserialization schema, from
> the documentation - "Returns: The deserialized message as an object (null
> if the message cannot be deserialized)."
> If this is not possible, what is the proper way to serialize Jackson
> objets into bytes in flink? Its possible to convert everything to String
> before the kafka producer but then any logic to determine the topic we need
> to send to will need to deserialize the string again
>

Reply via email to