Hi Kostas, I think it would be good to open two JIRAs to track these issues:
1) to document the shortcomings of the current solution 2) propose a solution based on your idea of group-ids. Would you like to do that? Thanks, Fabian 2016-12-01 10:48 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > Hi Kostas, > > Thanks for bringing up this issue and the good explanation! > > I think we need to do two things: > > 1) Clearly explain the limitations of the current version in the online > documentation and JavaDocs. > This should point out that the source does only work correctly with > event-time and timestamp/watermark assigners if the timestamps of records, > which are read from files in mod-time order, are monotonically increasing. > > 2) Implement a solution that handles event-time for files from different > sources correctly. > I like your proposal to defined file-groups based on the file name. > I believe in some sense this is similar to the problem of reading from > different Kafka partitions and assigning watermarks there. > There are different ways to implement this. Either we can assign complete > file-groups to readers and keep the current watermark there (similar as the > Kafka consumer works I assume). > Or we allow to completely distribute splits to arbitrary readers and have > a bit more effort in tracking the watermarks for each file-group. > In either case we might need to know the set of file-groups in advance or > a good way to deal with files that appear and start a new file-group. > > This is not trivial and we need a good design for this. > > Cheers, > Fabian > > > > 2016-12-01 10:05 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>: > >> Hi all, >> >> This is to open a discussion on how to better handle event-time in >> continuous file processing. >> >> For the sake of illustration of the problem we will use the example of >> processing hourly server logs. >> In this case, each server writes its logs in hourly files, with names: >> >> >> server-id-timestamp >> >> Assumptions: >> 1) we have two servers producing logs server-1 and server-2 >> 2) they have produced one file each, e.g. for 10am to 11am, so >> server-1-10, server-2-10 >> 3) our job has a parallelism of 2, so the >> ContinuousFileMonitoringFunction has parallelism 1 and the reader 2 >> 4) records within each file have timestamps in order, or >> moderately out-of-order >> 5) each log file is split into two splits by the underlying >> filesystem, e.g. server-1-10-1 and server-1-10-2 >> >> In the scenario above, and in the current implementation of the >> continuous file processing, the monitoring function will: >> 1) sort the files on ascending modification time, >> 2) compute the splits of each of the files and >> 3) forward the splits in order of the modification timestamp and >> their offset in the file to the downstream readers randomly. >> >> Given the above, reader-1 will take server-1-10-1, and reader-2, >> server-1-10-2. >> Focusing on reader-1, as soon as it gets its split, it will start reading >> the contained elements and assign timestamps to them >> based on a user-specified timestamp extractor (this may happen later in >> the pipeline bit it does not break the generality of the problem). >> In addition, given that we are operating in event time, the reader will >> also start emitting watermarks based on the timestamps >> assigned to the elements it has read. >> >> In this case, after processing server-1-10-1 and server-1-10-2 by the 2 >> readers, the watermark will have advanced somewhere in the >> middle of the timestamps included in the file (files have logs for 10 to >> 11 am). >> >> In this case, when the splits of file server-2-10 are to be processed, >> elements in the beginning of the file will be dropped as late. >> >> Proposed Solution: >> >> To face this, we could do the following: >> >> 1) Split the files (and their corresponding splits) in file-groups e.g. >> based on a user-specified parser of the filename. >> 2) Files/splits within the same file-group should be ordered so that >> server-1-10 is processed >> before server-1-11. This can be done through the same filename parser >> mentioned before. >> 3) In each reader task, keep a watermark emitter/ timestamp extractor and >> a (candidate) watermark per file-group. The watermark >> emitted by each task should be the minimum across all its file-groups. > > >