Hi,
My impression is that MongoDB's API is not complicated. So you can
implement a MongoDB sink. Something like:
@Override
public void invoke(Row value, Context context) throws Exception {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < fieldNames.length; i++) {
map.put(fieldNames[i], row.getField(i));
}
batch.add(new Document(map));
if (batch.size() >= conf.getBatchSize()) {
flush();
}
}
private void flush() {
if (batch.isEmpty()) {
return;
}
MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase());
MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(conf.getCollection());
mongoCollection.insertMany(batch);
batch.clear();
}
Best,
Jingsong Lee
On Wed, May 6, 2020 at 2:42 PM myflink <[email protected]> wrote:
> my solution:
> First, Flink sinks data to Kafka;
> Second, MongoDB reads data from Kafka. Over.
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Aissa Elaffani"<[email protected]>;
> *发送时间:* 2020年5月6日(星期三) 下午3:17
> *收件人:* "user"<[email protected]>;
> *主题:* MongoDB sink;
>
> Hello ,
> I want to sink my data to MongoDB but as far as I know there is no sink
> connector to MongoDB. How can I implement a MongoDB sink ? If there is any
> other solutions, I hope you can share with me.
>
--
Best, Jingsong Lee