[ https://issues.apache.org/jira/browse/FLINK-16929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075878#comment-17075878 ]
Ori Popowski commented on FLINK-16929: -------------------------------------- # {{SessionStartProcessFunction}} just ouputs to side output events that mark when the session is active/inactive # The main output is used to output to {{sessionStartSink}} # Here's a version without split: {code:java} KeyedStream<Foo, String> keyedStream = inputStream .keyBy(keySelector); WindowedStream<Foo, String, TimeWindow> windowedStream = keyedStream.window(EventTimeSessionWindows.withGap(gap)) .allowedLateness(allowedLateness); SingleOutputStreamOperator<SessionEventFlat> sessionAggregation = windowedStream .aggregate(new SessionAggregateFunction(false), new SessionIncrementalAggregationProcessWindowFunction()); sessionAggregation .keyBy(SessionEventFlat::getSId) .addSink(sessionsSink) .setParallelism(4); {code} *But all of the above is irrelevant because I've found the reason for the incorrect session cutoff.* The reason is not the splitting (the side output) as I originally thought, and also it has nothing to do with Flink 1.9 (it happens also in 1.5). We've found out that the {{EventTimeSessionWindows}}will triggers a new window when there are late events. I have no idea why it happens because: # We have an event-time watermark with 5 minutes out-of-orderdness. # The Session Window is configured with allowed lateness of 0. According to the docs, late events should be _dropped_. Here it seems that they trigger a window. If we'll understand why it happens we'll solve the problem. > Session Window produces sessions randomly > ----------------------------------------- > > Key: FLINK-16929 > URL: https://issues.apache.org/jira/browse/FLINK-16929 > Project: Flink > Issue Type: Bug > Affects Versions: 1.9.1 > Reporter: Ori Popowski > Priority: Major > Attachments: image-2020-04-01-19-56-00-239.png, > image-2020-04-01-19-56-27-720.png > > > We have a Flink job which keyBys session ID (sId), and uses a session window > with 30 minutes gap: > {code:java} > inputStream > .keyBy(keySelector) > .window(EventTimeSessionWindows.withGap(Time.minutes(30))) > .allowedLateness(Time.seconds(0L)) > {code} > This Flink job reads from Kinesis stream. > Lately (I suspect after upgrading from 1.5.4 to 1.9.1) we get too many > sessions, with gaps of several seconds (instead of 30 minutes). > We have no idea why it's happening and suspect a Flink bug or a state backend > bug (we use RocksDB). > I haven't found any indication in the logs except for some read throughput > warnings which were resolved by a backoff. > Attached is a table of derived sessions, and then the raw events > *Sessions* > !image-2020-04-01-19-56-00-239.png|width=753,height=406! > > *Events* > > !image-2020-04-01-19-56-27-720.png|width=312,height=383! > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)