Hi Esa, Reading records from files with timestamps that need watermarks can be tricky. If you are aware of Flink's watermark mechanism, you know that records should be ingested in (roughly) increasing timestamp order. This means that files usually cannot be split (i.e, need to be read by a single task from start to end) and also need to be read in the right order (files with smaller timestamps first). Also each file should contain records of a certain time interval that should not overlap (too much) with the time interval of other files.
Unfortunately, Flink does not provide good built-in support to read files in a specific order. If all files that you want to process are already present, you can implement a custom InputFormat by extending a CsvInputFormat, set unsplittable to true and override the getInputSplitAssigner() to return an assigner that returns the splits in the correct order. If you want to process files as they appear, things might be a bit easier given that the timestamps in each new file are larger than the timestamps of the previous files. In this case, you can use StreamExecutionEnvironment.readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assigner, it should be possible to get valid watermarks. In any case, reading timestamped data from files is much more tricky than ingesting data from an event log which provides the events in the same order in which they were written. Best, Fabian 2018-02-27 20:13 GMT+01:00 Esa Heikkinen <heikk...@student.tut.fi>: > > I'd want to read csv-files, which includes time series data and one column > is timestamp. > > Is it better to use addSource() (like in Data-artisans > RideCleansing-exercise) or CsvSourceTable() ? > > I am not sure CsvTableSource() can undertand timestamps ? I have not found > good examples about that. > > It is maybe little more job to write csv-parser in addSource()-case ? > > Best, Esa > >