Chesnay, thx for your reply, I’ve created one 
https://issues.apache.org/jira/browse/FLINK-9558 
<https://issues.apache.org/jira/browse/FLINK-9558>


> On 8 Jun 2018, at 12:58, Chesnay Schepler <ches...@apache.org> wrote:
> 
> I agree, if the sink doesn't properly work without checkpointing we should 
> make sure that it fails early if it used that way.
> 
> It would be great if you could open a JIRA.
> 
> On 08.06.2018 10:08, Rinat wrote:
>> Piotr, thx for your reply, for now everything is pretty clear. But from my 
>> point of view, it’s better to add some information about leaks in case of 
>> disabled checkpointing into BucketingSink documentation
>> 
>>> On 8 Jun 2018, at 10:35, Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> BucketingSink is designed to provide exactly-once writes to file system, 
>>> which is inherently tied to checkpointing. As you just saw, without 
>>> checkpointing, BucketingSink is never notified that it can commit pending 
>>> files. 
>>> 
>>> If you do not want to use checkpointing for some reasons, you could always 
>>> use for example 
>>> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
>>> and write your own simple `OutputFormat` or look if                   one 
>>> of the existing ones meet your needs.
>>> 
>>> Piotrek
>>> 
>>>> On 7 Jun 2018, at 14:23, Rinat <r.shari...@cleverdata.ru 
>>>> <mailto:r.shari...@cleverdata.ru>> wrote:
>>>> 
>>>> Hi mates, we got some Flink jobs, that are writing data from kafka into 
>>>> hdfs, using Bucketing-Sink.
>>>> For some reasons, those jobs are running without checkpointing. For now, 
>>>> it not a big problem for us, if some files are remained opened in case of 
>>>> job reloading.
>>>> 
>>>> Periodically, those jobs fail with OutOfMemory exception, and seems, that 
>>>> I found a strange thing in the implementation of BucketingSink.
>>>> 
>>>> During the sink lifecycle, we have a state object, implemented as a map, 
>>>> where key is a bucket path, and value is a state, that contains 
>>>> information about opened files and list of pending files.
>>>> After researching of the heap dump, I found, that those state stores 
>>>> information about ~ 1_000 buckets and their state, all this stuff weights 
>>>> ~ 120 Mb.
>>>> 
>>>> I’ve looked through the code, and found, that we removing the buckets from 
>>>> the state, in notifyCheckpointComplete method. 
>>>> 
>>>> @Override
>>>> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>>>>   Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = 
>>>> state.bucketStates.entrySet().iterator();
>>>>   while (bucketStatesIt.hasNext()) {
>>>>        if (!bucketState.isWriterOpen &&
>>>>        bucketState.pendingFiles.isEmpty() &&
>>>>        bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>>>> 
>>>>        // We've dealt with all the pending files and the writer for this 
>>>> bucket is not currently open.
>>>>        // Therefore this bucket is currently inactive and we can remove it 
>>>> from our state.
>>>>        bucketStatesIt.remove();
>>>>     }
>>>>     }
>>>> }
>>>> 
>>>> So, this looks like an issue, when you are using this sink in 
>>>> checkpointless environment, because the data always added to the state, 
>>>> but never removed.
>>>> Of course, we could enabled checkpointing, and use one of available 
>>>> backends, but as for me, it seems like a non expected behaviour, like I 
>>>> have an opportunity to run the job without checkpointing, but really, if I 
>>>> do so,
>>>> I got an exception in sink component.
>>>> 
>>>> What do you think about this ? Do anyone got the same problem, and how’ve 
>>>> you solved it ?
>>>> 
>>>> Sincerely yours,
>>>> Rinat Sharipov
>>>> Software Engineer at 1DMP CORE Team
>>>> 
>>>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>>>> mobile: +7 (925) 416-37-26
>>>> 
>>>> CleverDATA
>>>> make your data clever
>>>> 
>>> 
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply via email to