Hi,
I have configured my connector in following way:

MongoDBSource.<T>builder()
    ...
    .deserializer(new MongoDeserializationSchema<T>(clazz))
    .build();


My class MongoDeserializationSchema is defined like:

public class MongoDeserializationSchema<T> implements
DebeziumDeserializationSchema<T> {
  ...
  private final Class<T> clazz;
  private transient JsonConverter jsonConverter;

  public MongoDeserializationSchema(Class<T> clazz) { this.clazz = clazz; }

  public void deserialize(SourceRecord record, Collector<T> collector) {
    if (this.jsonConverter == null) {
      this.initializeJsonConverter();
    }
    try {
      byte[] bytes =
          jsonConverter.fromConnectData(record.topic(),
record.valueSchema(), record.value());
      T data = null;
      ... // deserialize to data from bytes
      if (data != null) {
        collector.collect(data);
      }
    }

     ....
  }

  public TypeInformation<T> getProducedType() { return Types.POJO(clazz); }

  private void initializeJsonConverter() {
    this.jsonConverter = new JsonConverter();
    HashMap<String, Object> configs = new HashMap(2);
    configs.put("converter.type", ConverterType.VALUE.getName());
    configs.put("schemas.enable", false);
    this.jsonConverter.configure(configs);
  }
}


So I am using org.apache.kafka.connect.json.JsonConverter to
deserialize SourceRecord to by data type T

This is working fine, but in case source records contains a long
number it formats number fields like:

{"field": {"$numberLong": "0"}

It breaks as POJO would have expected "field": 0

Somewhere I have read that one needs to specify:

"output.json.formatter":
"com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"

So that source record formats the full document like a regular JSON,
but I am not sure how and where I can specify this in my
configuration.

Can anyone help me with this?


Thanks

Sachin

Reply via email to