Hi all, This is to open a discussion on how to better handle event-time in continuous file processing.
For the sake of illustration of the problem we will use the example of processing hourly server logs. In this case, each server writes its logs in hourly files, with names: server-id-timestamp Assumptions: 1) we have two servers producing logs server-1 and server-2 2) they have produced one file each, e.g. for 10am to 11am, so server-1-10, server-2-10 3) our job has a parallelism of 2, so the ContinuousFileMonitoringFunction has parallelism 1 and the reader 2 4) records within each file have timestamps in order, or moderately out-of-order 5) each log file is split into two splits by the underlying filesystem, e.g. server-1-10-1 and server-1-10-2 In the scenario above, and in the current implementation of the continuous file processing, the monitoring function will: 1) sort the files on ascending modification time, 2) compute the splits of each of the files and 3) forward the splits in order of the modification timestamp and their offset in the file to the downstream readers randomly. Given the above, reader-1 will take server-1-10-1, and reader-2, server-1-10-2. Focusing on reader-1, as soon as it gets its split, it will start reading the contained elements and assign timestamps to them based on a user-specified timestamp extractor (this may happen later in the pipeline bit it does not break the generality of the problem). In addition, given that we are operating in event time, the reader will also start emitting watermarks based on the timestamps assigned to the elements it has read. In this case, after processing server-1-10-1 and server-1-10-2 by the 2 readers, the watermark will have advanced somewhere in the middle of the timestamps included in the file (files have logs for 10 to 11 am). In this case, when the splits of file server-2-10 are to be processed, elements in the beginning of the file will be dropped as late. Proposed Solution: To face this, we could do the following: 1) Split the files (and their corresponding splits) in file-groups e.g. based on a user-specified parser of the filename. 2) Files/splits within the same file-group should be ordered so that server-1-10 is processed before server-1-11. This can be done through the same filename parser mentioned before. 3) In each reader task, keep a watermark emitter/ timestamp extractor and a (candidate) watermark per file-group. The watermark emitted by each task should be the minimum across all its file-groups.