I'm also not aware of a MongoDB sink in Flink. The code provided by
Jingsong applies to the "SinkFunction" interface of Flink. That's a good
starting point to implement a custom Sink.

On Wed, May 6, 2020 at 9:46 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi,
>
> My impression is that MongoDB's API is not complicated. So you can
> implement a MongoDB sink. Something like:
>
> @Override
> public void invoke(Row value, Context context) throws Exception {
>     Map<String, Object> map = new HashMap<>();
>     for (int i = 0; i < fieldNames.length; i++) {
>         map.put(fieldNames[i], row.getField(i));
>     }
>     batch.add(new Document(map));
>     if (batch.size() >= conf.getBatchSize()) {
>         flush();
>     }
> }
>
> private void flush() {
>     if (batch.isEmpty()) {
>         return;
>     }
>     MongoDatabase mongoDatabase = client.getDatabase(conf.getDatabase());
>     MongoCollection<Document> mongoCollection = 
> mongoDatabase.getCollection(conf.getCollection());
>     mongoCollection.insertMany(batch);
>     batch.clear();
> }
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 2:42 PM myflink <2644631...@qq.com> wrote:
>
>> my solution:
>> First, Flink sinks data to Kafka;
>> Second, MongoDB reads data from Kafka. Over.
>>
>>
>> ------------------ 原始邮件 ------------------
>> *发件人:* "Aissa Elaffani"<aissaelaff...@gmail.com>;
>> *发送时间:* 2020年5月6日(星期三) 下午3:17
>> *收件人:* "user"<user@flink.apache.org>;
>> *主题:* MongoDB sink;
>>
>> Hello ,
>>  I want to sink my data to MongoDB but as far as I know there is no sink
>> connector to MongoDB.  How can I implement a MongoDB sink ? If there is any
>> other solutions, I hope you can share with me.
>>
>
>
> --
> Best, Jingsong Lee
>

Reply via email to