Hi Kostas, Thanks for bringing up this issue and the good explanation!
I think we need to do two things: 1) Clearly explain the limitations of the current version in the online documentation and JavaDocs. This should point out that the source does only work correctly with event-time and timestamp/watermark assigners if the timestamps of records, which are read from files in mod-time order, are monotonically increasing. 2) Implement a solution that handles event-time for files from different sources correctly. I like your proposal to defined file-groups based on the file name. I believe in some sense this is similar to the problem of reading from different Kafka partitions and assigning watermarks there. There are different ways to implement this. Either we can assign complete file-groups to readers and keep the current watermark there (similar as the Kafka consumer works I assume). Or we allow to completely distribute splits to arbitrary readers and have a bit more effort in tracking the watermarks for each file-group. In either case we might need to know the set of file-groups in advance or a good way to deal with files that appear and start a new file-group. This is not trivial and we need a good design for this. Cheers, Fabian 2016-12-01 10:05 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>: > Hi all, > > This is to open a discussion on how to better handle event-time in > continuous file processing. > > For the sake of illustration of the problem we will use the example of > processing hourly server logs. > In this case, each server writes its logs in hourly files, with names: > > > server-id-timestamp > > Assumptions: > 1) we have two servers producing logs server-1 and server-2 > 2) they have produced one file each, e.g. for 10am to 11am, so > server-1-10, server-2-10 > 3) our job has a parallelism of 2, so the > ContinuousFileMonitoringFunction has parallelism 1 and the reader 2 > 4) records within each file have timestamps in order, or > moderately out-of-order > 5) each log file is split into two splits by the underlying > filesystem, e.g. server-1-10-1 and server-1-10-2 > > In the scenario above, and in the current implementation of the continuous > file processing, the monitoring function will: > 1) sort the files on ascending modification time, > 2) compute the splits of each of the files and > 3) forward the splits in order of the modification timestamp and > their offset in the file to the downstream readers randomly. > > Given the above, reader-1 will take server-1-10-1, and reader-2, > server-1-10-2. > Focusing on reader-1, as soon as it gets its split, it will start reading > the contained elements and assign timestamps to them > based on a user-specified timestamp extractor (this may happen later in > the pipeline bit it does not break the generality of the problem). > In addition, given that we are operating in event time, the reader will > also start emitting watermarks based on the timestamps > assigned to the elements it has read. > > In this case, after processing server-1-10-1 and server-1-10-2 by the 2 > readers, the watermark will have advanced somewhere in the > middle of the timestamps included in the file (files have logs for 10 to > 11 am). > > In this case, when the splits of file server-2-10 are to be processed, > elements in the beginning of the file will be dropped as late. > > Proposed Solution: > > To face this, we could do the following: > > 1) Split the files (and their corresponding splits) in file-groups e.g. > based on a user-specified parser of the filename. > 2) Files/splits within the same file-group should be ordered so that > server-1-10 is processed > before server-1-11. This can be done through the same filename parser > mentioned before. > 3) In each reader task, keep a watermark emitter/ timestamp extractor and > a (candidate) watermark per file-group. The watermark > emitted by each task should be the minimum across all its file-groups.