Thanks for the share ~

The query you gave is actually an interval join[1] , a windowed join is two 
windowed stream join together, see [2].

Theoretically, for interval join, the state would be cleaned periodically based 
on the watermark and allowed lateness when the range of RHS had been considered 
“late”.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan
在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin <sof...@gmail.com>,写道:
> Hi Danny,
>
> Thank you for your response.
> I'm trying to join two streams that are both fairly high volume. My join 
> looks like this:
>
>   SELECT
>     A.rowtime as rowtime,
>     A.foo,
>     B.bar
>   FROM A
>   LEFT JOIN B
>     ON A.foo = B.foo
>     AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime
>
> When I run this SQL, the state size metric looks like a sawtooth that 
> gradually keeps growing.
> Currently I disabled this query because of a concern it could impact other 
> jobs.
>
> Based on your statement above, the SQL timed window is not supported?
> Is there another way I can make sure that the state only has data that is 
> only more recent?
>
> Thank you,
> Sofya
>
> > On Thu, Aug 27, 2020 at 10:49 PM Danny Chan <danny0...@apache.org> wrote:
> > > Hi, Sofya T. Irwin ~
> > >
> > > Can you share your case why you need a timed-window join there ?
> > >
> > > Now the sql timed window join is not supported yet, and i want to hear 
> > > your voice if it is necessary to support in SQL.
> > >
> > >
> > > > Sofya T. Irwin <sof...@gmail.com> 于2020年7月30日周四 下午10:44写道:
> > > > > Hi,
> > > > > I'm trying to investigate a SQL job using a time-windowed join that 
> > > > > is exhibiting a large, growing state. The join syntax is most similar 
> > > > > to the interval join 
> > > > > (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).
> > > > >
> > > > > A few questions:
> > > > > 1. Am I correct in understanding that State TTL is generally not 
> > > > > applicable for TableAPI&SQL? So we cannot use State TTL to limit 
> > > > > state size for a join?
> > > > >
> > > > > 2. It seems that Flink should be able to expire state even without 
> > > > > explicit settings based on this: "In TableAPI&SQL and DataStream, the 
> > > > > window aggregation and time-windowed join will clear expired state 
> > > > > using Timers which is triggered by watermark."  
> > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Does the above mean that Flink is expected to detect 
> > > > > expired state and clear it without explicit configuration to allow it 
> > > > > to do so?
> > > > >
> > > > > 3. I've looked into setting the idle state retention time. From what 
> > > > > I can understand, this particular setting is appropriate for my use 
> > > > > case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a 
> > > > > job level configuration which will enable state ttl for all 
> > > > > non-time-based operator states." 
> > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Would enabling this setting control state growth? Is this 
> > > > > only available for blink planner? Currently we are using the 
> > > > > StreamPlanner. Is there any way to ensure that idle state has limited 
> > > > > retention for applications using the StreamPlanner?
> > > > >
> > > > > Thanks ahead,
> > > > > Sofya

Reply via email to