Hi Kostas,

I think it would be good to open two JIRAs to track these issues:

1) to document the shortcomings of the current solution
2) propose a solution based on your idea of group-ids.

Would you like to do that?

Thanks,
Fabian

2016-12-01 10:48 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Kostas,
>
> Thanks for bringing up this issue and the good explanation!
>
> I think we need to do two things:
>
> 1) Clearly explain the limitations of the current version in the online
> documentation and JavaDocs.
> This should point out that the source does only work correctly with
> event-time and timestamp/watermark assigners if the timestamps of records,
> which are read from files in mod-time order, are monotonically increasing.
>
> 2) Implement a solution that handles event-time for files from different
> sources correctly.
> I like your proposal to defined file-groups based on the file name.
> I believe in some sense this is similar to the problem of reading from
> different Kafka partitions and assigning watermarks there.
> There are different ways to implement this. Either we can assign complete
> file-groups to readers and keep the current watermark there (similar as the
> Kafka consumer works I assume).
> Or we allow to completely distribute splits to arbitrary readers and have
> a bit more effort in tracking the watermarks for each file-group.
> In either case we might need to know the set of file-groups in advance or
> a good way to deal with files that appear and start a new file-group.
>
> This is not trivial and we need a good design for this.
>
> Cheers,
> Fabian
>
>
>
> 2016-12-01 10:05 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>:
>
>> Hi all,
>>
>> This is to open a discussion on how to better handle event-time in
>> continuous file processing.
>>
>> For the sake of illustration of the problem we will use the example of
>> processing hourly server logs.
>> In this case, each server writes its logs in hourly files, with names:
>>
>>
>> server-id-timestamp
>>
>> Assumptions:
>>         1) we have two servers producing logs server-1 and server-2
>>         2) they have produced one file each, e.g. for 10am to 11am, so
>> server-1-10, server-2-10
>>         3) our job has a parallelism of 2, so the
>> ContinuousFileMonitoringFunction has parallelism 1 and the reader 2
>>         4) records within each file have timestamps in order, or
>> moderately out-of-order
>>         5) each log file is split into two splits by the underlying
>> filesystem, e.g. server-1-10-1 and server-1-10-2
>>
>> In the scenario above, and in the current implementation of the
>> continuous file processing, the monitoring function will:
>>         1) sort the files on ascending modification time,
>>         2) compute the splits of each of the files and
>>         3) forward the splits in order of the modification timestamp and
>> their offset in the file to the downstream readers randomly.
>>
>> Given the above, reader-1 will take server-1-10-1, and reader-2,
>> server-1-10-2.
>> Focusing on reader-1, as soon as it gets its split, it will start reading
>> the contained elements and assign timestamps to them
>> based on a user-specified timestamp extractor (this may happen later in
>> the pipeline bit it does not break the generality of the problem).
>> In addition, given that we are operating in event time, the reader will
>> also start emitting watermarks based on the timestamps
>> assigned to the elements it has read.
>>
>> In this case, after processing server-1-10-1 and server-1-10-2 by the 2
>> readers, the watermark will have advanced somewhere in the
>> middle of the timestamps included in the file (files have logs for 10 to
>> 11 am).
>>
>> In this case, when the splits of file server-2-10 are to be processed,
>> elements in the beginning of the file will be dropped as late.
>>
>> Proposed Solution:
>>
>> To face this, we could do the following:
>>
>> 1) Split the files (and their corresponding splits) in file-groups e.g.
>> based on a user-specified parser of the filename.
>> 2) Files/splits within the same file-group should be ordered so that
>> server-1-10 is processed
>> before server-1-11. This can be done through the same filename parser
>> mentioned before.
>> 3) In each reader task, keep a watermark emitter/ timestamp extractor and
>> a (candidate) watermark per file-group. The watermark
>> emitted by each task should be the minimum across all its file-groups.
>
>
>

Reply via email to