Hi, I have configured my connector in following way: MongoDBSource.<T>builder() ... .deserializer(new MongoDeserializationSchema<T>(clazz)) .build();
My class MongoDeserializationSchema is defined like: public class MongoDeserializationSchema<T> implements DebeziumDeserializationSchema<T> { ... private final Class<T> clazz; private transient JsonConverter jsonConverter; public MongoDeserializationSchema(Class<T> clazz) { this.clazz = clazz; } public void deserialize(SourceRecord record, Collector<T> collector) { if (this.jsonConverter == null) { this.initializeJsonConverter(); } try { byte[] bytes = jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); T data = null; ... // deserialize to data from bytes if (data != null) { collector.collect(data); } } .... } public TypeInformation<T> getProducedType() { return Types.POJO(clazz); } private void initializeJsonConverter() { this.jsonConverter = new JsonConverter(); HashMap<String, Object> configs = new HashMap(2); configs.put("converter.type", ConverterType.VALUE.getName()); configs.put("schemas.enable", false); this.jsonConverter.configure(configs); } } So I am using org.apache.kafka.connect.json.JsonConverter to deserialize SourceRecord to by data type T This is working fine, but in case source records contains a long number it formats number fields like: {"field": {"$numberLong": "0"} It breaks as POJO would have expected "field": 0 Somewhere I have read that one needs to specify: "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson" So that source record formats the full document like a regular JSON, but I am not sure how and where I can specify this in my configuration. Can anyone help me with this? Thanks Sachin