Hi Kostas,

thank you very much for your answer.

Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to
compare as modificationTime < globalModificationTime (without accepting
equals). Later, however, I realized, as you correctly point out, that this
creates duplicates.

The original and now deprecated FileMonitoringFunction.java indeed kept a
map of filenames to their timestamps.

That works. However, this memory consumption is likely too much for my
application, as I may process millions of files.

What I’ve done so far is to create my own
MyPatchedContinuousFileMonitoringFunction that has a similar map, however
implemented with a LinkedHashMap to limit the size of the map to a desired
max num of entries, as in:

private volatile Map<String, Boolean> filenamesAlreadySeen = new
LinkedHashMap<String, Boolean>() {

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, Boolean> eldest) {
            return size() > MAX_ENTRIES;
        }
    };

and then changed shouldIgnore to:

private boolean shouldIgnore(Path filePath, long modificationTime) {
        assert (Thread.holdsLock(checkpointLock));
        boolean alreadySeen =
filenamesAlreadySeen.containsKey(filePath.getName());
        boolean shouldIgnore = alreadySeen || modificationTime <
globalModificationTime;
        filenamesAlreadySeen.put(filePath.getName(), true);

        if (shouldIgnore && LOG.isDebugEnabled()) {
            LOG.debug("Ignoring " + filePath + ", with mod time= " +
modificationTime +
                " and global mod time= " + globalModificationTime);
        }
        return shouldIgnore;
    }

This is a partial solution that works now for me. However, it’s still a
hack and very particular solution.

I think the real solution would be also to use the accessTime (not only the
modificationTime). However, as I pointed out in the github pull request, as
of flink 1.3.2, access time is always 0, at least on my machine and local
file system (macOS).
​

On Mon, 13 Nov 2017 at 10:47 Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Juan,
>
> The problem is that once a file for a certain timestamp is processed and
> the global modification timestamp is modified,
> then all files for that timestamp are considered processed.
>
> The solution is *not* to remove the = from the modificationTime <=
> globalModificationTime; in ContinuousFileMonitoringFunction, as this
> would lead to duplicates.
> The solution, in my opinion is to keep a list of the filenames (or hashes)
> of the files processed for the last globalModTimestamp (and only for that
> timestamp) and when there are new with the same timestamp, then check if
> the name of the file they belong is in that list.
>
> This way you pay a bit of memory but you get what you want.
>
> What do you think?
>
> Thanks,
> Kostas
>
> On Nov 10, 2017, at 12:54 PM, Juan Miguel Cejuela <jua...@tagtog.net>
> wrote:
>
> Hi there,
>
> I’m trying to watch a directory for new incoming files (with
> StreamExecutionEnvironment#readFile) with a subsecond latency (interval
> watch of ~100ms, and using the flag
> FileProcessingMode.PROCESS_CONTINUOUSLY).
>
> If many files come in within (under) the interval watching time, flink
> doesn’t seem to get notice of the files, and as a result, the files do not
> get processed. The behavior also seems undeterministic, as it likely
> depends on timeouts and so on. For example, 10 new files come in
> immediately (that is, essentially in parallel) and perhaps 3 files get
> processed, but the rest 7 don’t.
>
> I’ve extended and created my own FileInputFormat, for which I don’t do
> much more than in the open function, log when a new file comes in. That’s
> how I know that many fails get lost.
>
> On the other hand, when I restart flink, *all* the files in the directory
> are immediately processed. This is the expected behavior and works fine.
>
> The situation of unprocessed files is a bummer.
>
> Am I doing something wrong? Do I need to set something in the
> configuration? Is it a bug in Flink?
>
> Hopefully I described my problem clearly.
>
> Thank you.
> ​
>
>
>

Reply via email to