Chesnay, thx for your reply, I’ve created one https://issues.apache.org/jira/browse/FLINK-9558 <https://issues.apache.org/jira/browse/FLINK-9558>
> On 8 Jun 2018, at 12:58, Chesnay Schepler <ches...@apache.org> wrote: > > I agree, if the sink doesn't properly work without checkpointing we should > make sure that it fails early if it used that way. > > It would be great if you could open a JIRA. > > On 08.06.2018 10:08, Rinat wrote: >> Piotr, thx for your reply, for now everything is pretty clear. But from my >> point of view, it’s better to add some information about leaks in case of >> disabled checkpointing into BucketingSink documentation >> >>> On 8 Jun 2018, at 10:35, Piotr Nowojski <pi...@data-artisans.com >>> <mailto:pi...@data-artisans.com>> wrote: >>> >>> Hi, >>> >>> BucketingSink is designed to provide exactly-once writes to file system, >>> which is inherently tied to checkpointing. As you just saw, without >>> checkpointing, BucketingSink is never notified that it can commit pending >>> files. >>> >>> If you do not want to use checkpointing for some reasons, you could always >>> use for example >>> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat >>> and write your own simple `OutputFormat` or look if one >>> of the existing ones meet your needs. >>> >>> Piotrek >>> >>>> On 7 Jun 2018, at 14:23, Rinat <r.shari...@cleverdata.ru >>>> <mailto:r.shari...@cleverdata.ru>> wrote: >>>> >>>> Hi mates, we got some Flink jobs, that are writing data from kafka into >>>> hdfs, using Bucketing-Sink. >>>> For some reasons, those jobs are running without checkpointing. For now, >>>> it not a big problem for us, if some files are remained opened in case of >>>> job reloading. >>>> >>>> Periodically, those jobs fail with OutOfMemory exception, and seems, that >>>> I found a strange thing in the implementation of BucketingSink. >>>> >>>> During the sink lifecycle, we have a state object, implemented as a map, >>>> where key is a bucket path, and value is a state, that contains >>>> information about opened files and list of pending files. >>>> After researching of the heap dump, I found, that those state stores >>>> information about ~ 1_000 buckets and their state, all this stuff weights >>>> ~ 120 Mb. >>>> >>>> I’ve looked through the code, and found, that we removing the buckets from >>>> the state, in notifyCheckpointComplete method. >>>> >>>> @Override >>>> public void notifyCheckpointComplete(long checkpointId) throws Exception { >>>> Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = >>>> state.bucketStates.entrySet().iterator(); >>>> while (bucketStatesIt.hasNext()) { >>>> if (!bucketState.isWriterOpen && >>>> bucketState.pendingFiles.isEmpty() && >>>> bucketState.pendingFilesPerCheckpoint.isEmpty()) { >>>> >>>> // We've dealt with all the pending files and the writer for this >>>> bucket is not currently open. >>>> // Therefore this bucket is currently inactive and we can remove it >>>> from our state. >>>> bucketStatesIt.remove(); >>>> } >>>> } >>>> } >>>> >>>> So, this looks like an issue, when you are using this sink in >>>> checkpointless environment, because the data always added to the state, >>>> but never removed. >>>> Of course, we could enabled checkpointing, and use one of available >>>> backends, but as for me, it seems like a non expected behaviour, like I >>>> have an opportunity to run the job without checkpointing, but really, if I >>>> do so, >>>> I got an exception in sink component. >>>> >>>> What do you think about this ? Do anyone got the same problem, and how’ve >>>> you solved it ? >>>> >>>> Sincerely yours, >>>> Rinat Sharipov >>>> Software Engineer at 1DMP CORE Team >>>> >>>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> >>>> mobile: +7 (925) 416-37-26 >>>> >>>> CleverDATA >>>> make your data clever >>>> >>> >> >> Sincerely yours, >> Rinat Sharipov >> Software Engineer at 1DMP CORE Team >> >> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> >> mobile: +7 (925) 416-37-26 >> >> CleverDATA >> make your data clever >> > Sincerely yours, Rinat Sharipov Software Engineer at 1DMP CORE Team email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> mobile: +7 (925) 416-37-26 CleverDATA make your data clever