Hi, *Problem descriptions* I have two file-sources having a same format, each has at most one new file every single tumbling window, and I need to merge data from those two sources. My operators chain is as follow: FileReader1 --> Parser --\ Union -> WindowFunction (tumbling, merge) FileReader2 --> Parser --/ The parser is implemented within the "nextRecord()" method of my custom FileInputFormat (MyInputFormatOriginal)
Worrying of the speed the two streams are not sync-ed, data from one stream after parsed would queue up in memory waiting for the other, I tried to change to: FileReader1 --\ Union -> WindowFunction (tumbling, parse, merge) FileReader2 --/ The two streams that are being unioned are just DataStream[FileInputSplit] only. My new, simplified custom FileInputFormat (MyInputFormatLite) is just like: / class MyInputFormatLite extends FileInputFormat[FileInputSplit] { private var file: FileInputSplit = _ private var end: Boolean = false override def open(split: FileInputSplit): Unit = { this.end = false this.data = split // I don't call super.open() here } override def nextRecord(ot: FileInputSplit): FileInputSplit = { this.end = true this.data } override def reachedEnd: Boolean = this.end }/ In the WindowFunction, I will actually read the files using my existing parser class (MyInputFormatOriginal) / class myWindowFunction extends ProcessWindowFunction[FileInputSplit,...] { private lazy val reader = new MyInputFormatOriginal() override def open(parameters: Configuration): Unit = { super.open(parameters) reader.setRuntimeContext(this.getRuntimeContext) } override def process(key: String, context: Context, elements: Iterable[FileInputSplit], ...): Unit = { elements.foreach (split => { reader.open(split) out.collect(reader.nextRecord(SomeEmptySplit)) reader.close() }) } } / *ISSUES* My 2nd implementation is processing the files well, but there are two big issues: 1. The performance is only half of the initial implementation (I count the number of files processed when checkpointing is not running) 2. The checkpointing process stuck at the Window function. In the 1st implementation, with double the amount of data processed, each checkpoint takes about 1-2 minutes. While in the 2nd one, I have tried to wait up to 30 minutes without seeing any subtask completed the checkpoint. Could you please help tell me the wrong in that 2nd implementation? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/