Hi Sachin,

It is recommended to use org.bson.Document to convert MongoDB Extended JSON 
into Java types, and then perform further field mapping.

--------------------------------------------------------------------------------------------------------
             .deserializer(new DebeziumDeserializationSchema<Document>() {
                            @Override
                            public void deserialize(SourceRecord record, 
Collector<Document> out) {
                                Optional.ofNullable(record)
                                        .map(SourceRecord::value)
                                        .map(Struct.class::cast)
                                        .map(struct -> 
struct.getString("fullDocument"))
                                        .map(Document::parse)
                                        // mapping to other class types
                                        .ifPresent(out::collect);
                            }

                            @Override
                            public TypeInformation<Document> getProducedType() {
                                return Types.GENERIC(Document.class);
                            }
                        })
--------------------------------------------------------------------------------------------------------

Best,
Jiabao

On 2024/08/19 07:03:07 Sachin Mittal wrote:
> Hi,
> I have configured my connector in following way:
> 
> MongoDBSource.<T>builder()
>     ...
>     .deserializer(new MongoDeserializationSchema<T>(clazz))
>     .build();
> 
> 
> My class MongoDeserializationSchema is defined like:
> 
> public class MongoDeserializationSchema<T> implements
> DebeziumDeserializationSchema<T> {
>   ...
>   private final Class<T> clazz;
>   private transient JsonConverter jsonConverter;
> 
>   public MongoDeserializationSchema(Class<T> clazz) { this.clazz = clazz; }
> 
>   public void deserialize(SourceRecord record, Collector<T> collector) {
>     if (this.jsonConverter == null) {
>       this.initializeJsonConverter();
>     }
>     try {
>       byte[] bytes =
>           jsonConverter.fromConnectData(record.topic(),
> record.valueSchema(), record.value());
>       T data = null;
>       ... // deserialize to data from bytes
>       if (data != null) {
>         collector.collect(data);
>       }
>     }
> 
>      ....
>   }
> 
>   public TypeInformation<T> getProducedType() { return Types.POJO(clazz); }
> 
>   private void initializeJsonConverter() {
>     this.jsonConverter = new JsonConverter();
>     HashMap<String, Object> configs = new HashMap(2);
>     configs.put("converter.type", ConverterType.VALUE.getName());
>     configs.put("schemas.enable", false);
>     this.jsonConverter.configure(configs);
>   }
> }
> 
> 
> So I am using org.apache.kafka.connect.json.JsonConverter to
> deserialize SourceRecord to by data type T
> 
> This is working fine, but in case source records contains a long
> number it formats number fields like:
> 
> {"field": {"$numberLong": "0"}
> 
> It breaks as POJO would have expected "field": 0
> 
> Somewhere I have read that one needs to specify:
> 
> "output.json.formatter":
> "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"
> 
> So that source record formats the full document like a regular JSON,
> but I am not sure how and where I can specify this in my
> configuration.
> 
> Can anyone help me with this?
> 
> 
> Thanks
> 
> Sachin
> 

Reply via email to