Hi Jiahui,

Thanks for your suggestions.
I think we may need more detailed explanation about the behavior change.
Regarding to "supporting query configuration using Hints", I think it's a
one kind of approach, but we need more discussion.

Best,
Godfrey

Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 下午7:46写道:

> Yep yep :) I’m aware of the difference here for Blink and legacy Flink
> planner is only for sinks.
>
> But since on the API level toDataStream doesn’t take in a query level
> config, so it’s easy for people to think they can’t control it on a per
> query basis without digging into the source code.
>
> I have two questions / suggestions here:
>
> 1. Since StreamQueryConfig is deprecated and we want to consolidate config
> classes, can we maybe add an additional endpoint like
> .toRetractStream(Table, Class, minRetentionTime, maxRetentionTime)? Or at
> least add some Java docs so that I won’t worry about the behavior under the
> hook suddenly change?
> 2. What do we think about supporting query configuration using Hints to be
> a first class supported Flink feature?
>
> Thank you so much 😊
> ------------------------------
> *From:* godfrey he <godfre...@gmail.com>
> *Sent:* Tuesday, April 14, 2020 3:20 AM
> *To:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Cc:* Jark Wu <imj...@gmail.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> I think this is the problem of multiple sinks optimization. If we optimize
> each sink eager (that means we optimize the query when we call
> `writeToSink` or `insertInto`), `TableConfig#setIdleStateRetentionTime` is
> functionally equivalent to QueryConfig.  which require we need
> call `TableConfig#setIdleStateRetentionTime` before call `writeToSink` or
> `insertInto`.  While, If we use multiple sinks optimization, It's hard to
> map the value of `TableConfig#setIdleStateRetentionTime` to each query. I
> think it's a common issue for configuring for per query on multiple sinks
> optimization.
>
> but for `toRetractStream` method, we keep eager optimization strategy. So
> you can call `TableConfig#setIdleStateRetentionTime` before
> `toRetractStream`.
>
> Best,
> Godfrey
>
> Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 下午12:15写道:
>
> Hey Godfrey, in some of the use cases our users have, they have a couple
> of complex join queries where the key domains key evolving - we definitely
> want some sort of state retention for those queries; but there are other
> where the key domain doesn't evolve overtime, but there isn't really a
> guarantee on what's the maximum gap between 2 records of the same key to
> appear in the stream, we don't want to accidentally invalidate the state
> for those keys in these streams.
>
> Because of queries with different requirements can both exist in the
> pipeline, I think we have to config `IDLE_STATE_RETENTION_TIME` per
> operator.
>
> Just wondering, has similar requirement not come up much for SQL users
> before? (being able to set table / query configuration inside SQL queries)
>
> We are also a little bit concerned because right now since
> 'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the
> fact that TableConfig is read during toDataStream feels like relying on an
> implementation details that just happens to work, and there is no guarantee
> that it will keep working in the future versions...
>
> Thanks!
> ------------------------------
> *From:* godfrey he <godfre...@gmail.com>
> *Sent:* Monday, April 13, 2020 9:51 PM
> *To:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Cc:* Jark Wu <imj...@gmail.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> Query hint is a way for fine-grained configuration.
>  just out of curiosity, is it a strong requirement
>  that users need to config different IDLE_STATE_RETENTION_TIME for each
> operator?
>
> Best,
> Godfrey
>
> Jiahui Jiang <qzhzm173...@hotmail.com> 于2020年4月14日周二 上午2:07写道:
>
> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framework will parse it and set it up during
> table registration time.
>
> An example query that users can be writing right now looks like,
>
> *CREATE TABLE *`/output` *AS*
>
> *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */
> * *
>
> *FROM * `/input1` a
>
> INNER JOIN `/input2` b
>
> ON *a.column_name *=* b.column_name*;
>
> Is this something Flink SQL may want to support out of the box? (Starting
> from Calcite 1.22.0
> <https://calcite.apache.org/news/2020/03/05/release-1.22.0/>, it started
> to provide first class hint parsing)
>
>
> ------------------------------
> *From:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Sent:* Sunday, April 12, 2020 4:30 PM
> *To:* Jark Wu <imj...@gmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hey Jark, thank you so much for confirming!
>
> Out of curiosity, even though I agree that having too many config classes
> are confusing, not knowing when the config values are used during pipeline
> setup is also pretty confusing. For example, the name of 'TableConfig'
> makes me feel it's global to the whole tableEnvironment (which is true) but is
> only read once at execution (which is not true). Can we try to surface or
> add some documentation on when are these configs are read? 😄
>
> Thank you so much!
> ------------------------------
> *From:* Jark Wu <imj...@gmail.com>
> *Sent:* Saturday, April 11, 2020 8:45 AM
> *To:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Yes, that's right. Set idleStateRetentionTime on TableConfig before
> translation should work.
>
> On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang <qzhzm173...@hotmail.com>
> wrote:
>
> Thank you for answering! I was reading
> StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when
> trying to convert tables to DataStreams, planner.translate is taking the
> current tableConfig into account (aa in it reads the current tableConfig
> content even though it’s not explicitly passed in as an argument for
> translate). So seems like if I set tableConfig right before converting to
> DataStreams that should work?
>
> Or did you mean the actual tableEnvironment.execute()? Since we have a
> whole pipeline with multiple queries that also depends on each other. We
> have to have all the continuous queries executing concurrently.
>
> Thanks again!
> ------------------------------
> *From:* Jark Wu <imj...@gmail.com>
> *Sent:* Saturday, April 11, 2020 1:24 AM
> *To:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> QueryConfig is deprecated and will be removed in the future, because it is
> confusing that TableAPI has so many different config classes.
> If you want to set different idleStateRetentionTime for different queries,
> you can set a new idleStateRetentionTime on TableConfig before
> execute/submit the query.
>
> Best,
> Jark
>
> On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang <qzhzm173...@hotmail.com>
> wrote:
>
> Just looked into the source code a bit further and realized that for
> StreamTableEnvironmentImpl, even for sinks it's also doing translation
> lazily. Any way we can have different transformation to have different
> queryConfig?
> ------------------------------
> *From:* Jiahui Jiang <qzhzm173...@hotmail.com>
> *Sent:* Friday, April 10, 2020 6:46 PM
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Setting different idleStateRetentionTime for different queries
> executed in the same TableEnvironment in Flink 1.10
>
> Hello! I'm using Table API to write a pipeline with multiple queries. And
> I want to set up different idleStateRetentionTime for different queries.
>
> In Flink 1.8, it seems to be the case where I can pass in a
> streamQueryConfig when converting each output table into datastreams. And
> the translate with take the idleStateRetentionTime into account.
>
> But in Flink 1.10, that idleStateRetentionTime actually gets set on
> TableConfig and applies to the tableEnvironment.
>
> Is there a way to have different idleStateRetentionTime for different
> queries in 1.10?
>
> I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager
> translate. But does that mean if I have multiple sinks for the same
> datastream with different idleStateRetentionTime(s) configuration, that
> will cause the transformation to be executed multiple times?
>
> Thank you!
>
>

Reply via email to