I agree, if the sink doesn't properly work without checkpointing we
should make sure that it fails early if it used that way.
It would be great if you could open a JIRA.
On 08.06.2018 10:08, Rinat wrote:
Piotr, thx for your reply, for now everything is pretty clear. But
from my point of view, it’s better to add some information about leaks
in case of disabled checkpointing into BucketingSink documentation
On 8 Jun 2018, at 10:35, Piotr Nowojski <pi...@data-artisans.com
<mailto:pi...@data-artisans.com>> wrote:
Hi,
BucketingSink is designed to provide exactly-once writes to file
system, which is inherently tied to checkpointing. As you just saw,
without checkpointing, BucketingSink is never notified that it can
commit pending files.
If you do not want to use checkpointing for some reasons, you could
always use for example
org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat
and write your own simple `OutputFormat` or look if one of the
existing ones meet your needs.
Piotrek
On 7 Jun 2018, at 14:23, Rinat <r.shari...@cleverdata.ru
<mailto:r.shari...@cleverdata.ru>> wrote:
Hi mates, we got some Flink jobs, that are writing data from kafka
into hdfs, using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For
now, it not a big problem for us, if some files are remained opened
in case of job reloading.
Periodically, those jobs fail with *OutOfMemory *exception, and
seems, that I found a strange thing in the implementation of
BucketingSink.
During the sink lifecycle, we have a state object, implemented as a
map, where key is a bucket path, and value is a state, that contains
information about opened files and list of pending files.
After researching of the heap dump, I found, that those state stores
information about ~ 1_000 buckets and their state, all this stuff
weights ~ 120 Mb.
I’ve looked through the code, and found, that we removing the
buckets from the state, in *notifyCheckpointComplete *method.
@Override public void notifyCheckpointComplete(long checkpointId)throws
Exception {
Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt =
state.bucketStates.entrySet().iterator();
while (bucketStatesIt.hasNext()) {
if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty() &&
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
// We've dealt with all the pending files and the writer for this
bucket is not currently open. // Therefore this bucket is currently
inactive and we can remove it from our state. bucketStatesIt.remove();
}
}
}
So, this looks like an issue, when you are using this sink in
checkpointless environment, because the data always added to the
state, but never removed.
Of course, we could enabled checkpointing, and use one of available
backends, but as for me, it seems like a non expected behaviour,
like I have an opportunity to run the job without checkpointing, but
really, if I do so,
I got an exception in sink component.
What do you think about this ? Do anyone got the same problem, and
how’ve you solved it ?
Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team
email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26
CleverDATA
make your data clever
Sincerely yours,
*Rinat Sharipov*
Software Engineer at 1DMP CORE Team
email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26
CleverDATA
make your data clever