This worked!!!

Thanks
Sachin


On Mon, Aug 19, 2024 at 3:06 PM Jiabao Sun <jiabao...@apache.org> wrote:

> Hi Sachin,
>
> It is recommended to use org.bson.Document to convert MongoDB Extended
> JSON into Java types, and then perform further field mapping.
>
>
> --------------------------------------------------------------------------------------------------------
>              .deserializer(new DebeziumDeserializationSchema<Document>() {
>                             @Override
>                             public void deserialize(SourceRecord record,
> Collector<Document> out) {
>                                 Optional.ofNullable(record)
>                                         .map(SourceRecord::value)
>                                         .map(Struct.class::cast)
>                                         .map(struct ->
> struct.getString("fullDocument"))
>                                         .map(Document::parse)
>                                         // mapping to other class types
>                                         .ifPresent(out::collect);
>                             }
>
>                             @Override
>                             public TypeInformation<Document>
> getProducedType() {
>                                 return Types.GENERIC(Document.class);
>                             }
>                         })
>
> --------------------------------------------------------------------------------------------------------
>
> Best,
> Jiabao
>
> On 2024/08/19 07:03:07 Sachin Mittal wrote:
> > Hi,
> > I have configured my connector in following way:
> >
> > MongoDBSource.<T>builder()
> >     ...
> >     .deserializer(new MongoDeserializationSchema<T>(clazz))
> >     .build();
> >
> >
> > My class MongoDeserializationSchema is defined like:
> >
> > public class MongoDeserializationSchema<T> implements
> > DebeziumDeserializationSchema<T> {
> >   ...
> >   private final Class<T> clazz;
> >   private transient JsonConverter jsonConverter;
> >
> >   public MongoDeserializationSchema(Class<T> clazz) { this.clazz =
> clazz; }
> >
> >   public void deserialize(SourceRecord record, Collector<T> collector) {
> >     if (this.jsonConverter == null) {
> >       this.initializeJsonConverter();
> >     }
> >     try {
> >       byte[] bytes =
> >           jsonConverter.fromConnectData(record.topic(),
> > record.valueSchema(), record.value());
> >       T data = null;
> >       ... // deserialize to data from bytes
> >       if (data != null) {
> >         collector.collect(data);
> >       }
> >     }
> >
> >      ....
> >   }
> >
> >   public TypeInformation<T> getProducedType() { return
> Types.POJO(clazz); }
> >
> >   private void initializeJsonConverter() {
> >     this.jsonConverter = new JsonConverter();
> >     HashMap<String, Object> configs = new HashMap(2);
> >     configs.put("converter.type", ConverterType.VALUE.getName());
> >     configs.put("schemas.enable", false);
> >     this.jsonConverter.configure(configs);
> >   }
> > }
> >
> >
> > So I am using org.apache.kafka.connect.json.JsonConverter to
> > deserialize SourceRecord to by data type T
> >
> > This is working fine, but in case source records contains a long
> > number it formats number fields like:
> >
> > {"field": {"$numberLong": "0"}
> >
> > It breaks as POJO would have expected "field": 0
> >
> > Somewhere I have read that one needs to specify:
> >
> > "output.json.formatter":
> > "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"
> >
> > So that source record formats the full document like a regular JSON,
> > but I am not sure how and where I can specify this in my
> > configuration.
> >
> > Can anyone help me with this?
> >
> >
> > Thanks
> >
> > Sachin
> >
>

Reply via email to