Hi Kostas,
I didn't see any discussion on dev mailing list, so I'd like to share
our problems/solutions (we had a busy month...;)
1. we refactored ContinuousFileMonitoringFunction so that state includes
not only lastModificationTime, but also list of files that have exactly
this modification time. This way we're sure that we don't loose any
files that appear later with same modification time. It turned out that
for local file system this is quite important, as modificationTime in
java can have one second resolution (see e.g.
http://stackoverflow.com/questions/24804618/get-file-mtime-with-millisecond-resolution-from-java
- we learned it the hard way...)
2. we are able to safely delete files in following way:
- in ContinuousFileReaderOperator we emit additional marker event
after end of split
- the split contains information how many splits are in the file
- we added additional operator of parallelism 1 after
ContinuousFileReaderOperator which tracks additional events so that it
knows when all splits from file has been processed and deletes finished
files after appropriate checkpoints have been completed.
If you & other committers find these ideas ok, I can prepare jiras and
pull requests. While the first point is pretty straightforward IMHO, I'd
like to get some feedback one the second one.
thanks,
maciek
On 18/10/2016 11:52, Kostas Kloudas wrote:
Hi Maciek,
I agree with you that 1ms is often too long :P
This is the reason why I will open a discussion to have
all the ideas/ requirements / shortcomings in a single place.
This way the community can track and influence what
is coming next.
Hopefully I will do it in the afternoon and I will send you
the discussion thread.
Cheers,
Kostas
On Oct 18, 2016, at 11:43 AM, Maciek Próchniak <m...@touk.pl> wrote:
Hi Kostas,
thanks for quick answer.
I wouldn't dare to delete files in InputFormat if they were splitted and
processed in parallel...
As for using notifyCheckpointComplete - thanks for suggestion, it looks pretty
interesting, I'll try to try it out. Although I wonder a bit if relying only on
modification timestamp is enough - many things may happen in one ms :)
thanks,
macie
On 18/10/2016 11:14, Kostas Kloudas wrote:
Hi Maciek,
Just a follow-up on the previous email, given that splits are read in parallel,
when the
ContinuousFileMonitoringFunction forwards the last split, it does not mean that
the
final splits is going to be processed last. If the node it gets assigned is
fast enough
then it may be processed faster than others.
This assumption only holds if you have a parallelism of 1.
Cheers,
Kostas
On Oct 18, 2016, at 11:05 AM, Kostas Kloudas <k.klou...@data-artisans.com>
wrote:
Hi Maciek,
Currently this functionality is not supported but this seems like a good
addition.
Actually, give that the feature is rather new, we were thinking of opening a
discussion
in the dev mailing list in order to
i) discuss some current limitations of the Continuous File Processing source
ii) see how people use it and adjust our features accordingly
I will let you know as soon as I open this thread.
By the way for your use-case, we should probably have a callback in the
notifyCheckpointComplete()
that will inform the source that a given checkpoint was successfully performed
and then
we can purge the already processed files. This can be a good solution.
Thanks,
Kostas
On Oct 18, 2016, at 9:40 AM, Maciek Próchniak <m...@touk.pl> wrote:
Hi,
we want to monitor hdfs (or local) directory, read csv files that appear and
after successful processing - delete them (mainly not to run out of disk
space...)
I'm not quite sure how to achieve it with current implementation. Previously,
when we read binary data (unsplittable files) we made small hack and deleted
them
in our FileInputFormat - but now we want to use splits and detecting which
split is 'the last one' is no longer so obvious - of course it's also
problematic when it comes to checkpointing...
So my question is - is there a idiomatic way of deleting processed files?
thanks,
maciek