Hi Dominik,

I think the problem could be that TumblingTimeWindows don't start with the
timestamp of the first arriving event but start at a multiple of the window
length. So when defining a 90 day tumbling window you define a window from
0 - 89, 90 - 179, .... If your data ranges from day 79 - 109, then it would
fall into two windows.

Cheers,
Till

On Mon, Mar 1, 2021 at 5:34 PM Dominik Wosiński <wos...@gmail.com> wrote:

> Hey,
> I have a question regarding DataStream created from multiple files in s3. I
> have several files in AWS s3, say the path is s3://files/, and then there
> are several folders for different days, so in the end the full paths look
> like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I
> wanted to read all the files and sort them via some specific value.
>
> I thought that I could use the fact that the Long.MAX watermark is
> generated, so I've decided to use event time window of size larger than the
> data in files.
>
> So, I have something like:
>
> val inputFormat =new ParquetAvroInputFormat[TestData](new Path(
>   ("s3a://files/")))
> inputFormat.setNestedFileEnumeration(true)
> val ste = StreamExecutionEnvironment.createLocalEnvironment(1)
> ste.createInput(inputFormat)
>   .assignTimestampsAndWatermarks(
>      new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp))
>   .keyBy(_.getKey)
>   .timeWindow(Time.days(90))
>   .sideOutputLateData(sideOutput)
>   .process(new ProcessWindowFunction[TestData, TestData, String,
> TimeWindow] {
>     override def process(key: String, context: Context,
>                          elements: Iterable[TestData],
>                          out: Collector[TestData]): Unit = {
>       println("Processing: " + elements.toList.size + " for key:" + key)
>       elements.toSeq.sortBy(_.getTimestamp)
>         .foreach(out.collect(_))
>     }
>   })
>
>
>
>
>
>
> *ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes
> but those do not cause the issue described here.*
> The data in files is kept for 30 days, so there is no way that window will
> be closed before the files are closed and *Long.Max* timestamp generated.
>
> Now, the problem I am observing is that I would expect to see one message
> printed per key, since the parallelism is one. But for some reason I am
> observing that for some of the keys(most of them really) there are two
> windows created*. *I have  30 unique keys and each key contains around 1M
> records. And The output I can see is more or less like that:
>
> 1. Several messages about Switching to Random IO seek policy
> 2. Print for most of the keys present in the dataset (but the counts are
> quite small, most of them around 100k, some as small as few hundred)
> 3. More Switching to Random IO seek policy
> 4. Print again for some keys, but now the counts are much higher.
>
> So, the total count of all processed values is correct. It's just I am
> interested why the window gets invoked twice.
>
> Thanks in advance,
> Best Regards,
> Dom.
>

Reply via email to