Hi Oran, as you've already suggested, you could just use a (flat)map function that takes an ObjectNode and outputs a string. In the mapper, you can do whatever you want in case of an invalid object: logging about it, discarding it, writing an "error json string", writing to a side output stream, ...
On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster <oran.shus...@houzz.com> wrote: > In the documentation we have an example on how to implement > deserialization from bytes to Jackson ObjectNode objects > - JSONKeyValueDeserializationSchema > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/ > > However, there is no example on the other direction: Taking an > ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize > it to string > > You can write a simple schema like so > > > public class JSONKafkaSerializationSchema implements > KafkaSerializationSchema<JsonNode> { > private final ObjectMapper objectMapper = new ObjectMapper(); > > @Override > public ProducerRecord<byte[], byte[]> serialize(JsonNode element, > @Nullable Long timestamp) { > String topic = getTargetTopic(element); > > byte[] value; > > try { > value = objectMapper.writeValueAsBytes(element); > return new ProducerRecord<>(topic, value); > } catch (JsonProcessingException e) { > return null; > } > } > > private String getTargetTopic(JsonNode jsonNode) { > return jsonNode.get("topic").asText(); > } > } > > But this raises a question - What to do when a serialization fails? > if the input class is a simple POJO then Jackson should always succeed in > converting to bytes but that's not 100% guaranteed. > In case of failures, can we return null and the record will be discarded? > Null values are discarded in the case of the deserialization schema, from > the documentation - "Returns: The deserialized message as an object (null > if the message cannot be deserialized)." > If this is not possible, what is the proper way to serialize Jackson > objets into bytes in flink? Its possible to convert everything to String > before the kafka producer but then any logic to determine the topic we need > to send to will need to deserialize the string again >