Hi Jiahui, Query hint is a way for fine-grained configuration. just out of curiosity, is it a strong requirement that users need to config different IDLE_STATE_RETENTION_TIME for each operator?
Best, Godfrey Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 上午2:07写道: > Also for some more context, we are building a framework to help users > build their Flink pipeline with SQL. Our framework handles all the setup > and configuration, so that users only need to write the SQL queries without > having to have any Flink knowledge. > > One issue we encountered was, for some of the streams, the key domain > keeps evolving and we want to expire the states for older keys. But there > is no easy ways to allow users configure their state timeout directly > through SQL APIs. > Currently we are asking users to configure idleStateRetentionTime in a > custom SQL hint, then our framework will parse it and set it up during > table registration time. > > An example query that users can be writing right now looks like, > > *CREATE TABLE *`/output` *AS* > > *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ > ** > > *FROM *`/input1` a > > INNER JOIN `/input2` b > > ON *a.column_name *=* b.column_name*; > > Is this something Flink SQL may want to support out of the box? (Starting > from Calcite 1.22.0 > <https://calcite.apache.org/news/2020/03/05/release-1.22.0/>, it started > to provide first class hint parsing) > > > ------------------------------ > *From:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Sent:* Sunday, April 12, 2020 4:30 PM > *To:* Jark Wu <imj...@gmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Hey Jark, thank you so much for confirming! > > Out of curiosity, even though I agree that having too many config classes > are confusing, not knowing when the config values are used during pipeline > setup is also pretty confusing. For example, the name of 'TableConfig' > makes me feel it's global to the whole tableEnvironment (which is true) but is > only read once at execution (which is not true). Can we try to surface or > add some documentation on when are these configs are read? 😄 > > Thank you so much! > ------------------------------ > *From:* Jark Wu <imj...@gmail.com> > *Sent:* Saturday, April 11, 2020 8:45 AM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Yes, that's right. Set idleStateRetentionTime on TableConfig before > translation should work. > > On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <qzhzm173...@hotmail.com> > wrote: > > Thank you for answering! I was reading > StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when > trying to convert tables to DataStreams, planner.translate is taking the > current tableConfig into account (aa in it reads the current tableConfig > content even though it’s not explicitly passed in as an argument for > translate). So seems like if I set tableConfig right before converting to > DataStreams that should work? > > Or did you mean the actual tableEnvironment.execute()? Since we have a > whole pipeline with multiple queries that also depends on each other. We > have to have all the continuous queries executing concurrently. > > Thanks again! > ------------------------------ > *From:* Jark Wu <imj...@gmail.com> > *Sent:* Saturday, April 11, 2020 1:24 AM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Hi Jiahui, > > QueryConfig is deprecated and will be removed in the future, because it is > confusing that TableAPI has so many different config classes. > If you want to set different idleStateRetentionTime for different queries, > you can set a new idleStateRetentionTime on TableConfig before > execute/submit the query. > > Best, > Jark > > On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <qzhzm173...@hotmail.com> > wrote: > > Just looked into the source code a bit further and realized that for > StreamTableEnvironmentImpl, even for sinks it's also doing translation > lazily. Any way we can have different transformation to have different > queryConfig? > ------------------------------ > *From:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Sent:* Friday, April 10, 2020 6:46 PM > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Setting different idleStateRetentionTime for different queries > executed in the same TableEnvironment in Flink 1.10 > > Hello! I'm using Table API to write a pipeline with multiple queries. And > I want to set up different idleStateRetentionTime for different queries. > > In Flink 1.8, it seems to be the case where I can pass in a > streamQueryConfig when converting each output table into datastreams. And > the translate with take the idleStateRetentionTime into account. > > But in Flink 1.10, that idleStateRetentionTime actually gets set on > TableConfig and applies to the tableEnvironment. > > Is there a way to have different idleStateRetentionTime for different > queries in 1.10? > > I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager > translate. But does that mean if I have multiple sinks for the same > datastream with different idleStateRetentionTime(s) configuration, that > will cause the transformation to be executed multiple times? > > Thank you! > >