Hi Harish, Thanks to report this issue. There are currently 5 ways to write:
1. Flush only on checkpoint 'sink.buffer-flush.interval' = '-1' and 'sink.buffer-flush.max-rows' = '-1' 2. Flush for for every single element 'sink.buffer-flush.interval' = '0' or 'sink.buffer-flush.max-rows' = '1' 3. Flush when time interval exceed limit 'sink.buffer-flush.interval' > '0' and 'sink.buffer-flush.max-rows' = '-1' 4. Flush when batch size exceed limit 'sink.buffer-flush.max-rows' > '1' and 'sink.buffer-flush.interval' = '-1' 5. Flush when time interval or batch size exceed limit. 'sink.buffer-flush.interval' > '0' and 'sink.buffer-flush.max-rows' > 1 The default write mode is 5 with 'sink.buffer-flush.interval' = '1s' and 'sink.buffer-flush.max-rows' = '1000'. Whether the data sinks to mongodb is judged at the moment of checkpoint or the next record comes in. Currently, there is no separate thread to regularly check whether the interval between writing the last record exceeds the limit. Therefore, when you do not enable checkpoint for writing, delayed writing will occur. You can try to enable checkpoint or use single data writing mode (mode 2) to avoid this problem. Best, Jiabao > 2023年6月27日 下午6:00,<harish.srid...@kpn.com.invalid> > <harish.srid...@kpn.com.INVALID> 写道: > > Hi, > > I am using the flink version 1.7.1 and flink-mongodb-sql-connector version > 1.0.1-1.17. > > Below is the data pipeline flow. > > Source 1 (Kafka topic using Kafka connector) -> Window Aggregation (legacy > grouped window aggregation) -> Sink (Kafka topic using upsert-kafka > connector) -> Sink(Mongdb collection). > > > I noticed a couple of issues with the mongodb-sink connector. > > Issue 1: Mongo sink delays event write as soon as a tombstone message is > received. > > When a new key comes in, the aggregation is made and the result is written > successfully to kafka topic and also to mongo db immediately. > > Another new key comes in, the aggregation is made, the result is available in > mongodb. > > But when a key repeats for the first time, the aggregation is made, the > results are written to kafka topic. You get 1 delete record and 1 latest > record for the key in the topic. The data for the key is deleted in the > mongodb but, the latest record for the key is not inserted to mongodb. > > When a new key or another key comes in, the previous record latest key is > inserted to mongo db. > > The same pattern exists for subsequent records. > > There is always a delay of one event as soon as a tombstone record is found > by the connector. > > Issue 2: Mongo sink waits for new record to write previous records. > > I have a upsert-kafka topic filled that has already some events. > I start a new upsert-kafka to mongo db sink job. > > I expect all the data from the topic to be loaded to mongodb right away. > But instead, only the first record is written to mongo db. > The rest of the records don’t arrive in mongodb until a new event is > written to kafka topic. > The new event that was written is delayed until the next event arrives. > > I am not able to understand this behaviour. > This doesn’t feel like an expected behaviour. > Can someone please advice if I am missing something or an issue exists. > > Regards, > Harish. > > > > >