Hi Sachin, It is recommended to use org.bson.Document to convert MongoDB Extended JSON into Java types, and then perform further field mapping.
-------------------------------------------------------------------------------------------------------- .deserializer(new DebeziumDeserializationSchema<Document>() { @Override public void deserialize(SourceRecord record, Collector<Document> out) { Optional.ofNullable(record) .map(SourceRecord::value) .map(Struct.class::cast) .map(struct -> struct.getString("fullDocument")) .map(Document::parse) // mapping to other class types .ifPresent(out::collect); } @Override public TypeInformation<Document> getProducedType() { return Types.GENERIC(Document.class); } }) -------------------------------------------------------------------------------------------------------- Best, Jiabao On 2024/08/19 07:03:07 Sachin Mittal wrote: > Hi, > I have configured my connector in following way: > > MongoDBSource.<T>builder() > ... > .deserializer(new MongoDeserializationSchema<T>(clazz)) > .build(); > > > My class MongoDeserializationSchema is defined like: > > public class MongoDeserializationSchema<T> implements > DebeziumDeserializationSchema<T> { > ... > private final Class<T> clazz; > private transient JsonConverter jsonConverter; > > public MongoDeserializationSchema(Class<T> clazz) { this.clazz = clazz; } > > public void deserialize(SourceRecord record, Collector<T> collector) { > if (this.jsonConverter == null) { > this.initializeJsonConverter(); > } > try { > byte[] bytes = > jsonConverter.fromConnectData(record.topic(), > record.valueSchema(), record.value()); > T data = null; > ... // deserialize to data from bytes > if (data != null) { > collector.collect(data); > } > } > > .... > } > > public TypeInformation<T> getProducedType() { return Types.POJO(clazz); } > > private void initializeJsonConverter() { > this.jsonConverter = new JsonConverter(); > HashMap<String, Object> configs = new HashMap(2); > configs.put("converter.type", ConverterType.VALUE.getName()); > configs.put("schemas.enable", false); > this.jsonConverter.configure(configs); > } > } > > > So I am using org.apache.kafka.connect.json.JsonConverter to > deserialize SourceRecord to by data type T > > This is working fine, but in case source records contains a long > number it formats number fields like: > > {"field": {"$numberLong": "0"} > > It breaks as POJO would have expected "field": 0 > > Somewhere I have read that one needs to specify: > > "output.json.formatter": > "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson" > > So that source record formats the full document like a regular JSON, > but I am not sure how and where I can specify this in my > configuration. > > Can anyone help me with this? > > > Thanks > > Sachin >