Hi Jiahui, Thanks for your suggestions. I think we may need more detailed explanation about the behavior change. Regarding to "supporting query configuration using Hints", I think it's a one kind of approach, but we need more discussion.
Best, Godfrey Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 下午7:46写道: > Yep yep :) I’m aware of the difference here for Blink and legacy Flink > planner is only for sinks. > > But since on the API level toDataStream doesn’t take in a query level > config, so it’s easy for people to think they can’t control it on a per > query basis without digging into the source code. > > I have two questions / suggestions here: > > 1. Since StreamQueryConfig is deprecated and we want to consolidate config > classes, can we maybe add an additional endpoint like > .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at > least add some Java docs so that I won’t worry about the behavior under the > hook suddenly change? > 2. What do we think about supporting query configuration using Hints to be > a first class supported Flink feature? > > Thank you so much 😊 > ------------------------------ > *From:* godfrey he <godfre...@gmail.com> > *Sent:* Tuesday, April 14, 2020 3:20 AM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* Jark Wu <imj...@gmail.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Hi Jiahui, > > I think this is the problem of multiple sinks optimization. If we optimize > each sink eager (that means we optimize the query when we call > `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is > functionally equivalent to QueryConfig. which require we need > call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or > `insertInto`. While, If we use multiple sinks optimization, It's hard to > map the value of `TableConfig#setIdleStateRetentionTime` to each query. I > think it's a common issue for configuring for per query on multiple sinks > optimization. > > but for `toRetractStream` method, we keep eager optimization strategy. So > you can call `TableConfig#setIdleStateRetentionTime` before > `toRetractStream`. > > Best, > Godfrey > > Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 下午12:15写道: > > Hey Godfrey, in some of the use cases our users have, they have a couple > of complex join queries where the key domains key evolving - we definitely > want some sort of state retention for those queries; but there are other > where the key domain doesn't evolve overtime, but there isn't really a > guarantee on what's the maximum gap between 2 records of the same key to > appear in the stream, we don't want to accidentally invalidate the state > for those keys in these streams. > > Because of queries with different requirements can both exist in the > pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per > operator. > > Just wondering, has similar requirement not come up much for SQL users > before? (being able to set table / query configuration inside SQL queries) > > We are also a little bit concerned because right now since > 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the > fact that TableConfig is read during toDataStream feels like relying on an > implementation details that just happens to work, and there is no guarantee > that it will keep working in the future versions... > > Thanks! > ------------------------------ > *From:* godfrey he <godfre...@gmail.com> > *Sent:* Monday, April 13, 2020 9:51 PM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* Jark Wu <imj...@gmail.com>; user@flink.apache.org < > user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Hi Jiahui, > > Query hint is a way for fine-grained configuration. > just out of curiosity, is it a strong requirement > that users need to config different IDLE_STATE_RETENTION_TIME for each > operator? > > Best, > Godfrey > > Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 上午2:07写道: > > Also for some more context, we are building a framework to help users > build their Flink pipeline with SQL. Our framework handles all the setup > and configuration, so that users only need to write the SQL queries without > having to have any Flink knowledge. > > One issue we encountered was, for some of the streams, the key domain > keeps evolving and we want to expire the states for older keys. But there > is no easy ways to allow users configure their state timeout directly > through SQL APIs. > Currently we are asking users to configure idleStateRetentionTime in a > custom SQL hint, then our framework will parse it and set it up during > table registration time. > > An example query that users can be writing right now looks like, > > *CREATE TABLE *`/output` *AS* > > *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ > * * > > *FROM * `/input1` a > > INNER JOIN `/input2` b > > ON *a.column_name *=* b.column_name*; > > Is this something Flink SQL may want to support out of the box? (Starting > from Calcite 1.22.0 > <https://calcite.apache.org/news/2020/03/05/release-1.22.0/>, it started > to provide first class hint parsing) > > > ------------------------------ > *From:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Sent:* Sunday, April 12, 2020 4:30 PM > *To:* Jark Wu <imj...@gmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Hey Jark, thank you so much for confirming! > > Out of curiosity, even though I agree that having too many config classes > are confusing, not knowing when the config values are used during pipeline > setup is also pretty confusing. For example, the name of 'TableConfig' > makes me feel it's global to the whole tableEnvironment (which is true) but is > only read once at execution (which is not true). Can we try to surface or > add some documentation on when are these configs are read? 😄 > > Thank you so much! > ------------------------------ > *From:* Jark Wu <imj...@gmail.com> > *Sent:* Saturday, April 11, 2020 8:45 AM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Yes, that's right. Set idleStateRetentionTime on TableConfig before > translation should work. > > On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <qzhzm173...@hotmail.com> > wrote: > > Thank you for answering! I was reading > StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when > trying to convert tables to DataStreams, planner.translate is taking the > current tableConfig into account (aa in it reads the current tableConfig > content even though it’s not explicitly passed in as an argument for > translate). So seems like if I set tableConfig right before converting to > DataStreams that should work? > > Or did you mean the actual tableEnvironment.execute()? Since we have a > whole pipeline with multiple queries that also depends on each other. We > have to have all the continuous queries executing concurrently. > > Thanks again! > ------------------------------ > *From:* Jark Wu <imj...@gmail.com> > *Sent:* Saturday, April 11, 2020 1:24 AM > *To:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Setting different idleStateRetentionTime for different > queries executed in the same TableEnvironment in Flink 1.10 > > Hi Jiahui, > > QueryConfig is deprecated and will be removed in the future, because it is > confusing that TableAPI has so many different config classes. > If you want to set different idleStateRetentionTime for different queries, > you can set a new idleStateRetentionTime on TableConfig before > execute/submit the query. > > Best, > Jark > > On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <qzhzm173...@hotmail.com> > wrote: > > Just looked into the source code a bit further and realized that for > StreamTableEnvironmentImpl, even for sinks it's also doing translation > lazily. Any way we can have different transformation to have different > queryConfig? > ------------------------------ > *From:* Jiahui Jiang <qzhzm173...@hotmail.com> > *Sent:* Friday, April 10, 2020 6:46 PM > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Setting different idleStateRetentionTime for different queries > executed in the same TableEnvironment in Flink 1.10 > > Hello! I'm using Table API to write a pipeline with multiple queries. And > I want to set up different idleStateRetentionTime for different queries. > > In Flink 1.8, it seems to be the case where I can pass in a > streamQueryConfig when converting each output table into datastreams. And > the translate with take the idleStateRetentionTime into account. > > But in Flink 1.10, that idleStateRetentionTime actually gets set on > TableConfig and applies to the tableEnvironment. > > Is there a way to have different idleStateRetentionTime for different > queries in 1.10? > > I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager > translate. But does that mean if I have multiple sinks for the same > datastream with different idleStateRetentionTime(s) configuration, that > will cause the transformation to be executed multiple times? > > Thank you! > >