This worked!!! Thanks Sachin
On Mon, Aug 19, 2024 at 3:06 PM Jiabao Sun <jiabao...@apache.org> wrote: > Hi Sachin, > > It is recommended to use org.bson.Document to convert MongoDB Extended > JSON into Java types, and then perform further field mapping. > > > -------------------------------------------------------------------------------------------------------- > .deserializer(new DebeziumDeserializationSchema<Document>() { > @Override > public void deserialize(SourceRecord record, > Collector<Document> out) { > Optional.ofNullable(record) > .map(SourceRecord::value) > .map(Struct.class::cast) > .map(struct -> > struct.getString("fullDocument")) > .map(Document::parse) > // mapping to other class types > .ifPresent(out::collect); > } > > @Override > public TypeInformation<Document> > getProducedType() { > return Types.GENERIC(Document.class); > } > }) > > -------------------------------------------------------------------------------------------------------- > > Best, > Jiabao > > On 2024/08/19 07:03:07 Sachin Mittal wrote: > > Hi, > > I have configured my connector in following way: > > > > MongoDBSource.<T>builder() > > ... > > .deserializer(new MongoDeserializationSchema<T>(clazz)) > > .build(); > > > > > > My class MongoDeserializationSchema is defined like: > > > > public class MongoDeserializationSchema<T> implements > > DebeziumDeserializationSchema<T> { > > ... > > private final Class<T> clazz; > > private transient JsonConverter jsonConverter; > > > > public MongoDeserializationSchema(Class<T> clazz) { this.clazz = > clazz; } > > > > public void deserialize(SourceRecord record, Collector<T> collector) { > > if (this.jsonConverter == null) { > > this.initializeJsonConverter(); > > } > > try { > > byte[] bytes = > > jsonConverter.fromConnectData(record.topic(), > > record.valueSchema(), record.value()); > > T data = null; > > ... // deserialize to data from bytes > > if (data != null) { > > collector.collect(data); > > } > > } > > > > .... > > } > > > > public TypeInformation<T> getProducedType() { return > Types.POJO(clazz); } > > > > private void initializeJsonConverter() { > > this.jsonConverter = new JsonConverter(); > > HashMap<String, Object> configs = new HashMap(2); > > configs.put("converter.type", ConverterType.VALUE.getName()); > > configs.put("schemas.enable", false); > > this.jsonConverter.configure(configs); > > } > > } > > > > > > So I am using org.apache.kafka.connect.json.JsonConverter to > > deserialize SourceRecord to by data type T > > > > This is working fine, but in case source records contains a long > > number it formats number fields like: > > > > {"field": {"$numberLong": "0"} > > > > It breaks as POJO would have expected "field": 0 > > > > Somewhere I have read that one needs to specify: > > > > "output.json.formatter": > > "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson" > > > > So that source record formats the full document like a regular JSON, > > but I am not sure how and where I can specify this in my > > configuration. > > > > Can anyone help me with this? > > > > > > Thanks > > > > Sachin > > >