Hi,

My impression is that MongoDB's API is not complicated. So you can
implement a MongoDB sink. Something like:

@Override
public void invoke(Row value, Context context) throws Exception {
    Map<String, Object> map = new HashMap<>();
    for (int i = 0; i < fieldNames.length; i++) {
        map.put(fieldNames[i], row.getField(i));
    }
    batch.add(new Document(map));
    if (batch.size() >= conf.getBatchSize()) {
        flush();
    }
}

private void flush() {
    if (batch.isEmpty()) {
        return;
    }
    MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase());
    MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(conf.getCollection());
    mongoCollection.insertMany(batch);
    batch.clear();
}

Best,
Jingsong Lee

On Wed, May 6, 2020 at 2:42 PM myflink <2644631...@qq.com> wrote:

> my solution:
> First, Flink sinks data to Kafka;
> Second, MongoDB reads data from Kafka. Over.
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Aissa Elaffani"<aissaelaff...@gmail.com>;
> *发送时间:* 2020年5月6日(星期三) 下午3:17
> *收件人:* "user"<user@flink.apache.org>;
> *主题:* MongoDB sink;
>
> Hello ,
>  I want to sink my data to MongoDB but as far as I know there is no sink
> connector to MongoDB.  How can I implement a MongoDB sink ? If there is any
> other solutions, I hope you can share with me.
>


-- 
Best, Jingsong Lee

Reply via email to