Thanks for the share ~ The query you gave is actually an interval join[1] , a windowed join is two windowed stream join together, see [2].
Theoretically, for interval join, the state would be cleaned periodically based on the watermark and allowed lateness when the range of RHS had been considered “late”. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html Best, Danny Chan 在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin <sof...@gmail.com>,写道: > Hi Danny, > > Thank you for your response. > I'm trying to join two streams that are both fairly high volume. My join > looks like this: > > SELECT > A.rowtime as rowtime, > A.foo, > B.bar > FROM A > LEFT JOIN B > ON A.foo = B.foo > AND A.rowtime BETWEEN B.rowtime - INTERVAL '1' HOUR AND B.rowtime > > When I run this SQL, the state size metric looks like a sawtooth that > gradually keeps growing. > Currently I disabled this query because of a concern it could impact other > jobs. > > Based on your statement above, the SQL timed window is not supported? > Is there another way I can make sure that the state only has data that is > only more recent? > > Thank you, > Sofya > > > On Thu, Aug 27, 2020 at 10:49 PM Danny Chan <danny0...@apache.org> wrote: > > > Hi, Sofya T. Irwin ~ > > > > > > Can you share your case why you need a timed-window join there ? > > > > > > Now the sql timed window join is not supported yet, and i want to hear > > > your voice if it is necessary to support in SQL. > > > > > > > > > > Sofya T. Irwin <sof...@gmail.com> 于2020年7月30日周四 下午10:44写道: > > > > > Hi, > > > > > I'm trying to investigate a SQL job using a time-windowed join that > > > > > is exhibiting a large, growing state. The join syntax is most similar > > > > > to the interval join > > > > > (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html). > > > > > > > > > > A few questions: > > > > > 1. Am I correct in understanding that State TTL is generally not > > > > > applicable for TableAPI&SQL? So we cannot use State TTL to limit > > > > > state size for a join? > > > > > > > > > > 2. It seems that Flink should be able to expire state even without > > > > > explicit settings based on this: "In TableAPI&SQL and DataStream, the > > > > > window aggregation and time-windowed join will clear expired state > > > > > using Timers which is triggered by watermark." > > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html) > > > > > > > > > > To clarify: Does the above mean that Flink is expected to detect > > > > > expired state and clear it without explicit configuration to allow it > > > > > to do so? > > > > > > > > > > 3. I've looked into setting the idle state retention time. From what > > > > > I can understand, this particular setting is appropriate for my use > > > > > case. "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a > > > > > job level configuration which will enable state ttl for all > > > > > non-time-based operator states." > > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html) > > > > > > > > > > To clarify: Would enabling this setting control state growth? Is this > > > > > only available for blink planner? Currently we are using the > > > > > StreamPlanner. Is there any way to ensure that idle state has limited > > > > > retention for applications using the StreamPlanner? > > > > > > > > > > Thanks ahead, > > > > > Sofya