I'm also not aware of a MongoDB sink in Flink. The code provided by Jingsong applies to the "SinkFunction" interface of Flink. That's a good starting point to implement a custom Sink.
On Wed, May 6, 2020 at 9:46 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi, > > My impression is that MongoDB's API is not complicated. So you can > implement a MongoDB sink. Something like: > > @Override > public void invoke(Row value, Context context) throws Exception { > Map<String, Object> map = new HashMap<>(); > for (int i = 0; i < fieldNames.length; i++) { > map.put(fieldNames[i], row.getField(i)); > } > batch.add(new Document(map)); > if (batch.size() >= conf.getBatchSize()) { > flush(); > } > } > > private void flush() { > if (batch.isEmpty()) { > return; > } > MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase()); > MongoCollection<Document> mongoCollection = > mongoDatabase.getCollection(conf.getCollection()); > mongoCollection.insertMany(batch); > batch.clear(); > } > > Best, > Jingsong Lee > > On Wed, May 6, 2020 at 2:42 PM myflink <2644631...@qq.com> wrote: > >> my solution: >> First, Flink sinks data to Kafka; >> Second, MongoDB reads data from Kafka. Over. >> >> >> ------------------ 原始邮件 ------------------ >> *发件人:* "Aissa Elaffani"<aissaelaff...@gmail.com>; >> *发送时间:* 2020年5月6日(星期三) 下午3:17 >> *收件人:* "user"<user@flink.apache.org>; >> *主题:* MongoDB sink; >> >> Hello , >> I want to sink my data to MongoDB but as far as I know there is no sink >> connector to MongoDB. How can I implement a MongoDB sink ? If there is any >> other solutions, I hope you can share with me. >> > > > -- > Best, Jingsong Lee >