Hey, I have a question regarding DataStream created from multiple files in s3. I have several files in AWS s3, say the path is s3://files/, and then there are several folders for different days, so in the end the full paths look like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I wanted to read all the files and sort them via some specific value.
I thought that I could use the fact that the Long.MAX watermark is generated, so I've decided to use event time window of size larger than the data in files. So, I have something like: val inputFormat =new ParquetAvroInputFormat[TestData](new Path( ("s3a://files/"))) inputFormat.setNestedFileEnumeration(true) val ste = StreamExecutionEnvironment.createLocalEnvironment(1) ste.createInput(inputFormat) .assignTimestampsAndWatermarks( new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp)) .keyBy(_.getKey) .timeWindow(Time.days(90)) .sideOutputLateData(sideOutput) .process(new ProcessWindowFunction[TestData, TestData, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[TestData], out: Collector[TestData]): Unit = { println("Processing: " + elements.toList.size + " for key:" + key) elements.toSeq.sortBy(_.getTimestamp) .foreach(out.collect(_)) } }) *ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes but those do not cause the issue described here.* The data in files is kept for 30 days, so there is no way that window will be closed before the files are closed and *Long.Max* timestamp generated. Now, the problem I am observing is that I would expect to see one message printed per key, since the parallelism is one. But for some reason I am observing that for some of the keys(most of them really) there are two windows created*. *I have 30 unique keys and each key contains around 1M records. And The output I can see is more or less like that: 1. Several messages about Switching to Random IO seek policy 2. Print for most of the keys present in the dataset (but the counts are quite small, most of them around 100k, some as small as few hundred) 3. More Switching to Random IO seek policy 4. Print again for some keys, but now the counts are much higher. So, the total count of all processed values is correct. It's just I am interested why the window gets invoked twice. Thanks in advance, Best Regards, Dom.