Hi Dominik, I think the problem could be that TumblingTimeWindows don't start with the timestamp of the first arriving event but start at a multiple of the window length. So when defining a 90 day tumbling window you define a window from 0 - 89, 90 - 179, .... If your data ranges from day 79 - 109, then it would fall into two windows.
Cheers, Till On Mon, Mar 1, 2021 at 5:34 PM Dominik Wosiński <wos...@gmail.com> wrote: > Hey, > I have a question regarding DataStream created from multiple files in s3. I > have several files in AWS s3, say the path is s3://files/, and then there > are several folders for different days, so in the end the full paths look > like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I > wanted to read all the files and sort them via some specific value. > > I thought that I could use the fact that the Long.MAX watermark is > generated, so I've decided to use event time window of size larger than the > data in files. > > So, I have something like: > > val inputFormat =new ParquetAvroInputFormat[TestData](new Path( > ("s3a://files/"))) > inputFormat.setNestedFileEnumeration(true) > val ste = StreamExecutionEnvironment.createLocalEnvironment(1) > ste.createInput(inputFormat) > .assignTimestampsAndWatermarks( > new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp)) > .keyBy(_.getKey) > .timeWindow(Time.days(90)) > .sideOutputLateData(sideOutput) > .process(new ProcessWindowFunction[TestData, TestData, String, > TimeWindow] { > override def process(key: String, context: Context, > elements: Iterable[TestData], > out: Collector[TestData]): Unit = { > println("Processing: " + elements.toList.size + " for key:" + key) > elements.toSeq.sortBy(_.getTimestamp) > .foreach(out.collect(_)) > } > }) > > > > > > > *ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes > but those do not cause the issue described here.* > The data in files is kept for 30 days, so there is no way that window will > be closed before the files are closed and *Long.Max* timestamp generated. > > Now, the problem I am observing is that I would expect to see one message > printed per key, since the parallelism is one. But for some reason I am > observing that for some of the keys(most of them really) there are two > windows created*. *I have 30 unique keys and each key contains around 1M > records. And The output I can see is more or less like that: > > 1. Several messages about Switching to Random IO seek policy > 2. Print for most of the keys present in the dataset (but the counts are > quite small, most of them around 100k, some as small as few hundred) > 3. More Switching to Random IO seek policy > 4. Print again for some keys, but now the counts are much higher. > > So, the total count of all processed values is correct. It's just I am > interested why the window gets invoked twice. > > Thanks in advance, > Best Regards, > Dom. >