Hi,

*Problem descriptions*
I have two file-sources having a same format, each has at most one new file
every single tumbling window, and I need to merge data from those two
sources. My operators chain is as follow:
       FileReader1 --> Parser --\
                                             Union -> WindowFunction
(tumbling, merge)
       FileReader2 --> Parser --/ 
The parser is implemented within the "nextRecord()" method of my custom
FileInputFormat (MyInputFormatOriginal)

Worrying of the speed the two streams are not sync-ed, data from one stream
after parsed would queue up in memory waiting for the other, I tried to
change to:
       FileReader1 --\
                             Union -> WindowFunction (tumbling, parse,
merge)
       FileReader2 --/ 
The two streams that are being unioned are just DataStream[FileInputSplit]
only. My new, simplified custom FileInputFormat (MyInputFormatLite) is just
like:

/       class MyInputFormatLite extends FileInputFormat[FileInputSplit] {
                private var file: FileInputSplit = _
                private var end: Boolean = false
                override def open(split: FileInputSplit): Unit = {
                        this.end = false
                        this.data = split
                        // I don't call super.open() here
                }
                override def nextRecord(ot: FileInputSplit): FileInputSplit = {
                        this.end = true
                        this.data
                }
                override def reachedEnd: Boolean = this.end
        }/

In the WindowFunction, I will actually read the files using my existing
parser class (MyInputFormatOriginal)
/       class myWindowFunction extends 
ProcessWindowFunction[FileInputSplit,...] {
                private lazy val reader = new MyInputFormatOriginal()
                override def open(parameters: Configuration): Unit = {
                        super.open(parameters)
                        reader.setRuntimeContext(this.getRuntimeContext)
                }
                override def process(key: String, context: Context, elements:
Iterable[FileInputSplit], ...): Unit = {
                        elements.foreach (split => {
                                reader.open(split)
                                out.collect(reader.nextRecord(SomeEmptySplit))
                                reader.close()
                        })
                }
        }
/

*ISSUES*
My 2nd implementation is processing the files well, but there are two big
issues:
        1. The performance is only half of the initial implementation (I
count the number of files processed when checkpointing is not running)
        2. The checkpointing process stuck at the Window function. In the
1st implementation, with double the amount of data processed, each
checkpoint takes about 1-2 minutes. While in the 2nd one, I have tried to
wait up to 30 minutes without seeing any subtask completed the checkpoint. 

Could you please help tell me the wrong in that 2nd implementation?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to