Hi, My impression is that MongoDB's API is not complicated. So you can implement a MongoDB sink. Something like:
@Override public void invoke(Row value, Context context) throws Exception { Map<String, Object> map = new HashMap<>(); for (int i = 0; i < fieldNames.length; i++) { map.put(fieldNames[i], row.getField(i)); } batch.add(new Document(map)); if (batch.size() >= conf.getBatchSize()) { flush(); } } private void flush() { if (batch.isEmpty()) { return; } MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase()); MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(conf.getCollection()); mongoCollection.insertMany(batch); batch.clear(); } Best, Jingsong Lee On Wed, May 6, 2020 at 2:42 PM myflink <2644631...@qq.com> wrote: > my solution: > First, Flink sinks data to Kafka; > Second, MongoDB reads data from Kafka. Over. > > > ------------------ 原始邮件 ------------------ > *发件人:* "Aissa Elaffani"<aissaelaff...@gmail.com>; > *发送时间:* 2020年5月6日(星期三) 下午3:17 > *收件人:* "user"<user@flink.apache.org>; > *主题:* MongoDB sink; > > Hello , > I want to sink my data to MongoDB but as far as I know there is no sink > connector to MongoDB. How can I implement a MongoDB sink ? If there is any > other solutions, I hope you can share with me. > -- Best, Jingsong Lee