[jira] [Created] (FLINK-36965) Enable to allow re-create the pod watch with many retries on k8s cluster failure
Yun Tang created FLINK-36965: Summary: Enable to allow re-create the pod watch with many retries on k8s cluster failure Key: FLINK-36965 URL: https://issues.apache.org/jira/browse/FLINK-36965 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Affects Versions: 1.20.0 Reporter: Yun Tang FLINK-33728 introduce the backoff strategy when creating the watch to pods. By doing so, we can set the {{kubernetes.transactional-operation.max-retries}} to a very large value to tolerate the k8s cluster downtime for a long time. However, there still exists two problems: 1. If we set the {{kubernetes.transactional-operation.max-retries}} to {{100}} + times, which means we hope the JobMaster would not crash to tolerate more than one hour k8s cluster downtime. However, this would also make the {{FlinkKubeClient#checkAndUpdateConfigMap}} much longer, which is not necessary. 2. Moreover, creating the watch to pods is not a transactional operation, current config option {{kubernetes.transactional-operation.initial-retry-delay}} and {{kubernetes.transactional-operation.max-retry-delay}} is misleading. Thus, I think we should introduce another new {{kubernetes.watch-operation.max-retries}} with {{kubernetes.watch-operation.initial-retry-delay}} and {{kubernetes.watch-operation.max-retry-delay}} to deprecate the previous two options. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36964) Fix potential exception when SchemaChange in parallel with Paimon Sink.
Yanquan Lv created FLINK-36964: -- Summary: Fix potential exception when SchemaChange in parallel with Paimon Sink. Key: FLINK-36964 URL: https://issues.apache.org/jira/browse/FLINK-36964 Project: Flink Issue Type: New Feature Components: Flink CDC Affects Versions: cdc-3.2.1, cdc-3.2.0 Reporter: Yanquan Lv Fix For: cdc-3.3.0 When assigning bucket number in BucketAssignOperator, SchemaChangeEvent was not broadcasted to downstream, which may cause the SchemaInfo in PaimonWriter is not in line with DataChangeEvent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-492: Support Query Modifications for Materialized Tables.
Hi, everyone, The discussion has not received any responses for a while. If there are no further comments this week, I will close the discussion and initiate the vote next Monday. Thank you all for your input! Best regards, Feng Jin On Fri, Dec 20, 2024 at 10:38 AM Feng Jin wrote: > Hi > Zuo and Lincoln > > Thanks for your reply. > > @Zuo > > > detects whether the modified state is compatioble or not with the > previous state automatically ? > > > Automatic detection of state compatibility is technically feasible, but > currently, there is no ready-made interface to check whether the job before > and after modification is compatible. However, this issue may be beyond the > scope of this FLIP. Regarding the topic of Flink SQL state compatibility, I > believe a separate FLIP is needed to describe its behavior in more detail. > > > > > @Lincoln > > > > And for the default behavior of alter operation under continuous mode, > can you add an example of starting the new job with hints(similar to the > case in the User Story section)? > > > Thank you for the suggestion. The compatibility recovery for users is not > easy to judge. I think removing it is reasonable, as it is not suitable to > be provided as a public feature. I have already updated the relevant > content. > > > > Best, > Feng Jin > > > On Thu, Dec 19, 2024 at 10:08 PM Lincoln Lee > wrote: > >> Thanks Feng for driving this! >> Supporting modification is an important improvement for Materialized >> Table. >> >> Regarding the alter table reserve historic data, I have similar question >> with Ron, >> Users can't easily to judge whether a change is simple enough to keep >> state >> compatibility with old refresh job under the continuous mode. Therefore, I >> suggest removing the description of “Compatibile Recovery” from the public >> inferface section. >> >> And for the default behavior of alter operation under continuous mode, can >> you >> add an example of starting the new job with hints(similar to the case in >> the >> User Story section)? >> >> Best, >> Lincoln Lee >> >> >> Wei Zuo <1015766...@qq.com.invalid> 于2024年12月19日周四 14:05写道: >> >> > Hi, Feng >> > >> > >> > Is it possible that the framework detects whether the modified state is >> > compatioble or not with the previous state automatically? It would >> be >> > better to recognize query state compatibility automatically. >> > >> > >> > Best, >> > >> > >> > Zuo Wei >> > >> > >> > >> > >> > -- 原始邮件 -- >> > 发件人: >> > "dev" >> > < >> > ron9@gmail.com>; >> > 发送时间: 2024年12月19日(星期四) 上午10:15 >> > 收件人: "Feng Jin"> > 抄送: "dev"> > lincoln.8...@gmail.com>; >> > 主题: Re: [DISCUSS] FLIP-492: Support Query Modifications for >> > Materialized Tables. >> > >> > >> > >> > Hi, Feng >> > >> > The reply looks good to me. But I have one question: You mentioned the >> > `DESC MATERIALIZED TABLE` syntax in FLIP, but we didn't provide this >> syntax >> > until now. I think we should add it to this FLIP if needed. >> > >> > Best, >> > Ron >> > >> > Feng Jin > > >> > > Hi Ron >> > > >> > > Thanks for your reply. >> > > >> > > > Is it only possible to add columns at the end and not >> > anywhere in >> > > table schema, some databases have this limitation, does lake >> storage >> > such >> > > as Iceberg/Paimon have this limitation? >> > > >> > > >> > > Currently, we can restrict adding columns only to the end of >> > the schema. >> > > Although both Paimon and Iceberg already support adding columns >> > anywhere, >> > > there are still some systems that do not. I will include this in >> the >> > FLIP. >> > > >> > > >> > > > In the Refresh Task Behavior section you mention partition >> > hints, is it >> > > possible to clarify what it is in the FLIP? >> > > >> > > >> > > I have added the relevant details. >> > > >> > > >> > > > Are you able to articulate the default behavior? >> > > >> > > >> > > The detailed explanation for this part has been updated. >> > > >> > > >> > > > How users can determine if states are compatible? >> > > >> > > >> > > Users can only rely on their experience to make modifications. >> > Currently, >> > > the Flink framework does not guarantee that changes to SQL logic >> will >> > > maintain state compatibility. >> > > >> > > I think we can add some suggestions in the user documentation in >> the >> > > future. While the framework itself cannot ensure state >> compatibility, >> > some >> > > simple modification scenarios can indeed be compatible. >> > > >> > > For now, the responsibility is left to the users. >> > > >> > > >> > > Even if recovery ultimately fails, users still have the option to >> roll >> > > back to the original query or start consuming from a new offset by >> > > disabling recovery parameters. >> > > >> > > >> > > >> > > >> > > Best, >> > > Feng >> > > >> > > >> > > On Tue, Dec 17, 2024 at 10:37 AM Ron Liu > > wrote: >>
[jira] [Created] (FLINK-36961) Wait ForSt state excutor shutdown when disposing
Yanfei Lei created FLINK-36961: -- Summary: Wait ForSt state excutor shutdown when disposing Key: FLINK-36961 URL: https://issues.apache.org/jira/browse/FLINK-36961 Project: Flink Issue Type: Sub-task Reporter: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36962) push down non-deterministic filter after stream join to source by mistake
Shuai Xu created FLINK-36962: Summary: push down non-deterministic filter after stream join to source by mistake Key: FLINK-36962 URL: https://issues.apache.org/jira/browse/FLINK-36962 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 2.0-preview Reporter: Shuai Xu Non-deterministic filter after stream join is push down to source by mistake. Modify the org.apache.flink.table.planner.plan.stream.sql.CalcTest with following snippet of code. {code:java} @BeforeEach def setup(): Unit = { util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c) util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c) util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf) } @Test def testCalcWithNonDeterministicFilterAfterJoin(): Unit = { val sqlQuery = "SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 join SourceTable t2 on t1.b = t2.b) t " + "WHERE TO_TIMESTAMP(t.t1c, '-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, -2, NOW()) and t.t2c > '2022-01-01 00:00:00'" util.verifyRelPlan(sqlQuery) } {code} we expected the plan with {code:java} Calc(select=[a], where=[>(TO_TIMESTAMP(c, '-MM-dd HH:mm:ss'), +(NOW(), -720:INTERVAL HOUR))])+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[b]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code} but the plan is {code:java} Calc(select=[a]) +- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[b]]) : +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, '-MM-dd HH:mm:ss'), +(NOW(), -720:INTERVAL HOUR))]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36963) Wrong context maintain around `asyncProcessWithKey`
Zakelly Lan created FLINK-36963: --- Summary: Wrong context maintain around `asyncProcessWithKey` Key: FLINK-36963 URL: https://issues.apache.org/jira/browse/FLINK-36963 Project: Flink Issue Type: Sub-task Components: Runtime / Async State Processing Reporter: Zakelly Lan Assignee: Zakelly Lan Within `asyncProcessWithKey`, the key/record context is switched to the one for asynchronous procedure then switched back. However we picked the wrong old context and switched back with the wrong one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-496: SQL connector for keyed state data
If I understand correctly you use purely SQL workloads for savepoint creation. In short such case the initial version won't be much help. In a more detailed version SQL operators are creating non-keyed/operator state data which is going to be supported later on. The first version supports user code defined keyed state data. BR, G On Tue, Dec 24, 2024 at 4:45 AM Venkatakrishnan Sowrirajan wrote: > Hi Gabor, > > - Expose state metadata on data stream API > > - Add metadata support for the SQL connector > > - Add non-keyed/operator state data support for the SQL connector > > - Add state catalog > > > Thanks for this FLIP. This will be a great addition. Although, most of our > use cases are SQL and not DataStream which seems to only benefit when > "non-keyed/operator > state data support for the SQL connector" is added. Is my understanding > correct? > > Regards > Venkata krishnan > > > On Mon, Dec 23, 2024 at 6:59 PM Shengkai Fang wrote: > > > Thanks for your clarification. Looking forward to the following FLIPs. > > > > Best, > > Shengkai > > > > Gabor Somogyi 于2024年12月23日周一 18:32写道: > > > > > Hi Shengkai, > > > > > > Thanks for your comments. > > > > > > Both state catalog and metadata tables are on the roadmap but not in > this > > > FLIP. > > > The main approach is to do things incrementally in order to avoid > > > infinite discussion threads and 10k+ lines pull requests. > > > > > > Just to give some teaser, this is how I can roughly imagine upcoming > > FLIPs > > > (unordered): > > > - Expose state metadata on data stream API > > > - Add metadata support for the SQL connector > > > - Add non-keyed/operator state data support for the SQL connector > > > - Add state catalog > > > > > > BR, > > > G > > > > > > > > > On Mon, Dec 23, 2024 at 2:46 AM Shengkai Fang > wrote: > > > > > > > Hi. > > > > > > > > Thanks Gabor's great FLIP. It is very useful for many cases. But I > have > > > > some suggestions about this FLIP: > > > > > > > > 1. It's better to introduce a state Catalog that helps users to > > translate > > > > the state information to a table schema. The state catalog uses > > > > ``.`` to find required state and uses > state > > > > descriptor to restore the schema. > > > > 2. Can the connector provide more metadata information for users? For > > > > example, the `ttl` field is very useful in our cases. > > > > > > > > Best, > > > > Shengkai > > > > > > > > > > > > > > > > > > > > Gabor Somogyi 于2024年12月19日周四 21:41写道: > > > > > > > > > Hi Yanquan, > > > > > > > > > > Thanks for the question, please see the FLIP content related this. > > > > > > > > > > ---BEGIN FLIP CONTENT--- > > > > > The target of this implementation proposal is to provide SQL > support > > > for > > > > > keyed states. Other types are going to be covered in further FLIP > > > > > documents. > > > > > ---END FLIP CONTENT--- > > > > > > > > > > Keyed state data which is defined by users in for example process > > > > > functions, operator or non-keyed state data is defined by internal > > > Flink > > > > > operators for example sources/sinks. > > > > > In short, we're intended to add this support later on which means > > that > > > > > further configuration parameters needs to be added. > > > > > > > > > > BR, > > > > > G > > > > > > > > > > > > > > > On Thu, Dec 19, 2024 at 2:12 PM Yanquan Lv > > > wrote: > > > > > > > > > > > Hi, Gabor. Thanks for driving this FLIP and this immediately > > reminded > > > > me > > > > > > of exploring the status of the Source connector like Kafka or > > > MySQLCDC > > > > to > > > > > > understand the progress of reading information. > > > > > > > > > > > > In SourceOperator, the state of reader in contained in a > ListState > > > > named > > > > > > `SourceReaderState`[1], however, the state type of > > > `SourceReaderState` > > > > is > > > > > > `byte[]`, so I am worried that we can't get any understandable > > > > > information > > > > > > using this connector. Do we consider supporting this scenario? > > > > > > > > > > > > If we want to support this scenario, perhaps we need to add more > > > > > > parameters like fields.#.state-version-serializer and > > > > > > fields.#.state-version to To specify the > org.apache.flink.core.io > > > > > .SimpleVersionedSerializer[2] > > > > > > and version to be used, And it also requires more complex > > > > implementation. > > > > > > > > > > > > Do I have any misunderstandings? Is this requirement something we > > > need > > > > to > > > > > > support, or will it need support in the future. > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/9e38f32889e097f5a9b81f949643a2b037c1035c/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java*L109__;Iw!!IKRxdwAv5BmarQ!f-7abNJ6_TcTJTEJmqsVdk7k8XT3k4e-UlBe4cOzZ7-NvL83cup9bQB-HX0VKoaxPlbZyQ7rRrg6KTlxdJI$ > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > https://ur