That does not matter. 2018-03-01 13:32 GMT+01:00 Esa Heikkinen <esa.heikki...@student.tut.fi>:
> Hi > > > > Should the custom source function be written by Java, but no Scala, like > in that RideCleansing exercise ? > > > > Best, Esa > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Thursday, March 1, 2018 11:23 AM > > *To:* Esa Heikkinen <esa.heikki...@student.tut.fi> > *Cc:* user@flink.apache.org > *Subject:* Re: Reading csv-files > > > > Hi Esa, > > IMO, the easiest approach would be to implement a custom source function > that reads the CSV files line-wise (in the correct timestamp order) and > extracts timestamps. > > At the end of each file, you can emit a watermark. > > The order of files can either be hardcoded or determined from the file > name. > > > > This approach is similar to the source function in the RideCleansing > exercise [1] (without the alignment of timestamps with the actual time). > > Once you have a DataStream with correctly assigned timestamps and > watermarks, you should be able to use the CEP library. > > Best, Fabian > > > [1] https://github.com/dataArtisans/flink-training- > exercises/blob/master/src/main/java/com/dataartisans/ > flinktraining/exercises/datastream_java/sources/ > CheckpointedTaxiRideSource.java > > > > 2018-02-28 10:47 GMT+01:00 Esa Heikkinen <esa.heikki...@student.tut.fi>: > > Because I have no time to learn all features of Flink and because there > can be some issues in this my case, I am very interested about implementing > external “logs replayer” or some batch to stream data converter. > > > > Do you have any ideas or suggestions how to build this kind of logs > replayer ? Or could it be even found at the ready ? > > Could Kafka do something like this ? > > > > I think I also can write this logs replayer by Python. > > > > What kind of parallel streams would be best and easiest for Flink ? > > > > By the way, I am writing conference paper about comparing Flink and my > LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log > File Analyzer for Mining Expected Behavior from Log Files): > > https://www.researchgate.net/profile/Timo_Haemaelaeinen/ > publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_ > Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf > > > > LOGDIG is very simple and slow analyzer and it runs only in local computer > (at this moment), but it is capable to analyze very complex cases from many > parallel log files. The analysis of LOGDIG is close to CEP. I have written > it by Python. > > > > I don’t know whether Flink is the best benchmarking target, but I do not > know better. I also tried Spark, but it also had its own problems. For > example CEP is not good in Spark than in Flink. > > > > Best, Esa > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Tuesday, February 27, 2018 11:27 PM > *To:* Esa Heikkinen <esa.heikki...@student.tut.fi> > *Cc:* user@flink.apache.org > *Subject:* Re: Reading csv-files > > > > Yes, that is mostly correct. > You can of course read files in parallel, assign watermarks, and obtain a > DataStream with correct timestamps and watermarks. > If you do that, you should ensure that each parallel source tasks reads > the files in the order of increasing timestamps. > > As I said before, you can do that by providing a custom InputSplitAssigner > that hands out the splits in order of their timestamps. > The timestamp order would need to be encoded in the file name because the > assigner cannot look into the file. > > Reading unsplitted files in a single task makes the problem a bit easier > to handle, but parallel reads are also possible. > > > > The RideCleansing example that you are referring to, does not have these > problems because the source reads the data in a single thread from a single > file. > This is done in order to avoid all the issues that I described before. > > Best, Fabian > > > > > > 2018-02-27 22:14 GMT+01:00 Esa Heikkinen <heikk...@student.tut.fi>: > > > > Hi > > Thanks for the answer. All csv-files are already present and they will not > change during the processing. > > Because Flink can read many streams in parallel, i think it is also > possbile to read many csv-files in parallel. > > From what i have understand, it is possible to convert csv-files to > streams internally in Flink ? But the problem may be how to synchronize > parallel reading of csv-files based on timestamps ? > > Maybe i should develop an external "replayer" of csv-files, which > generates parallel streams of events (based on timestamps) for Flink ? > > But i think the "replayer" is also possible to do by Flink and it also can > be run at an accelerated speed ? > > The RideCleansing-example does something like that, but i don't know if it > otherwise appropriate to my purpose. > > Best, Esa > > > > Fabian Hueske kirjoitti 27.2.2018 klo 22:32: > > Hi Esa, > > Reading records from files with timestamps that need watermarks can be > tricky. > > If you are aware of Flink's watermark mechanism, you know that records > should be ingested in (roughly) increasing timestamp order. > > This means that files usually cannot be split (i.e, need to be read by a > single task from start to end) and also need to be read in the right order > (files with smaller timestamps first). > > Also each file should contain records of a certain time interval that > should not overlap (too much) with the time interval of other files. > > > > Unfortunately, Flink does not provide good built-in support to read files > in a specific order. > > If all files that you want to process are already present, you can > implement a custom InputFormat by extending a CsvInputFormat, set > unsplittable to true and override the getInputSplitAssigner() to return an > assigner that returns the splits in the correct order. > > > If you want to process files as they appear, things might be a bit easier > given that the timestamps in each new file are larger than the timestamps > of the previous files. In this case, you can use > StreamExecutionEnvironment.readFile() > with the interval and FileProcessingMode parameter. With a correctly > configured watermark assigner, it should be possible to get valid > watermarks. > > In any case, reading timestamped data from files is much more tricky than > ingesting data from an event log which provides the events in the same > order in which they were written. > > Best, Fabian > > > > 2018-02-27 20:13 GMT+01:00 Esa Heikkinen <heikk...@student.tut.fi>: > > > I'd want to read csv-files, which includes time series data and one column > is timestamp. > > Is it better to use addSource() (like in Data-artisans > RideCleansing-exercise) or CsvSourceTable() ? > > I am not sure CsvTableSource() can undertand timestamps ? I have not found > good examples about that. > > It is maybe little more job to write csv-parser in addSource()-case ? > > Best, Esa > > > > > > > > >