[jira] [Created] (FLINK-36965) Enable to allow re-create the pod watch with many retries on k8s cluster failure

2024-12-25 Thread Yun Tang (Jira)
Yun Tang created FLINK-36965:


 Summary: Enable to allow re-create the pod watch with many retries 
on k8s cluster failure
 Key: FLINK-36965
 URL: https://issues.apache.org/jira/browse/FLINK-36965
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.20.0
Reporter: Yun Tang


FLINK-33728 introduce the backoff strategy when creating the watch to pods. By 
doing so, we can set the {{kubernetes.transactional-operation.max-retries}} to 
a very large value to tolerate the k8s cluster downtime for a long time. 
However, there still exists two problems:
1. If we set the {{kubernetes.transactional-operation.max-retries}} to {{100}} 
+ times, which means we hope the JobMaster would not crash to tolerate more 
than one hour k8s cluster downtime. However, this would also make the 
{{FlinkKubeClient#checkAndUpdateConfigMap}} much longer, which is not necessary.
2. Moreover, creating the watch to pods is not a transactional operation, 
current config option 
{{kubernetes.transactional-operation.initial-retry-delay}} and 
{{kubernetes.transactional-operation.max-retry-delay}} is misleading.

Thus, I think we should introduce another new 
{{kubernetes.watch-operation.max-retries}} with 
{{kubernetes.watch-operation.initial-retry-delay}} and 
{{kubernetes.watch-operation.max-retry-delay}} to deprecate the previous two 
options.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36964) Fix potential exception when SchemaChange in parallel with Paimon Sink.

2024-12-25 Thread Yanquan Lv (Jira)
Yanquan Lv created FLINK-36964:
--

 Summary: Fix potential exception when SchemaChange in parallel 
with Paimon Sink.
 Key: FLINK-36964
 URL: https://issues.apache.org/jira/browse/FLINK-36964
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Affects Versions: cdc-3.2.1, cdc-3.2.0
Reporter: Yanquan Lv
 Fix For: cdc-3.3.0


When assigning bucket number in BucketAssignOperator, SchemaChangeEvent was not 
broadcasted to downstream, which may cause the SchemaInfo in PaimonWriter is 
not in line with DataChangeEvent. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-492: Support Query Modifications for Materialized Tables.

2024-12-25 Thread Feng Jin
Hi, everyone,

The discussion has not received any responses for a while. If there are no
further comments this week, I will close the discussion and initiate the
vote next Monday.

Thank you all for your input!


Best regards,
Feng Jin


On Fri, Dec 20, 2024 at 10:38 AM Feng Jin  wrote:

> Hi
> Zuo and Lincoln
>
> Thanks for your reply.
>
> @Zuo
>
> >  detects whether the modified state is compatioble or not with the
> previous state automatically ?
>
>
> Automatic detection of state compatibility is technically feasible, but
> currently, there is no ready-made interface to check whether the job before
> and after modification is compatible. However, this issue may be beyond the
> scope of this FLIP. Regarding the topic of Flink SQL state compatibility, I
> believe a separate FLIP is needed to describe its behavior in more detail.
>
>
>
>
> @Lincoln
>
>
> > And for the default behavior of alter operation under continuous mode,
> can you add an example of starting the new job with hints(similar to the
> case in the User Story section)?
>
>
> Thank you for the suggestion. The compatibility recovery for users is not
> easy to judge. I think removing it is reasonable, as it is not suitable to
> be provided as a public feature. I have already updated the relevant
> content.
>
>
>
> Best,
> Feng Jin
>
>
> On Thu, Dec 19, 2024 at 10:08 PM Lincoln Lee 
> wrote:
>
>> Thanks Feng for driving this!
>> Supporting modification is an important improvement for Materialized
>> Table.
>>
>> Regarding the alter table reserve historic data, I have similar question
>> with Ron,
>> Users can't easily to judge whether a change is simple enough to keep
>> state
>> compatibility with old refresh job under the continuous mode. Therefore, I
>> suggest removing the description of “Compatibile Recovery” from the public
>> inferface section.
>>
>> And for the default behavior of alter operation under continuous mode, can
>> you
>> add an example of starting the new job with hints(similar to the case in
>> the
>> User Story section)?
>>
>> Best,
>> Lincoln Lee
>>
>>
>> Wei Zuo <1015766...@qq.com.invalid> 于2024年12月19日周四 14:05写道:
>>
>> > Hi, Feng
>> >
>> >
>> > Is it possible that the framework detects whether the modified state is
>> > compatioble or not with the previous state automatically? It would
>> be
>> > better to recognize query state compatibility automatically.
>> >
>> >
>> > Best,
>> >
>> >
>> > Zuo Wei
>> >
>> >
>> >
>> >
>> > -- 原始邮件 --
>> > 发件人:
>> >   "dev"
>> > <
>> > ron9@gmail.com>;
>> > 发送时间: 2024年12月19日(星期四) 上午10:15
>> > 收件人: "Feng Jin"> > 抄送: "dev"> > lincoln.8...@gmail.com>;
>> > 主题: Re: [DISCUSS] FLIP-492: Support Query Modifications for
>> > Materialized Tables.
>> >
>> >
>> >
>> > Hi, Feng
>> >
>> > The reply looks good to me. But I have one question: You mentioned the
>> > `DESC MATERIALIZED TABLE` syntax in FLIP, but we didn't provide this
>> syntax
>> > until now. I think we should add it to this FLIP if needed.
>> >
>> > Best,
>> > Ron
>> >
>> > Feng Jin > >
>> > > Hi Ron
>> > >
>> > > Thanks for your reply.
>> > >
>> > > >  Is it only possible to add columns at the end and not
>> > anywhere in
>> > > table schema, some databases have this limitation, does lake
>> storage
>> > such
>> > > as Iceberg/Paimon have this limitation?
>> > >
>> > >
>> > >  Currently, we can restrict adding columns only to the end of
>> > the schema.
>> > > Although both Paimon and Iceberg already support adding columns
>> > anywhere,
>> > > there are still some systems that do not. I will include this in
>> the
>> > FLIP.
>> > >
>> > >
>> > > > In the Refresh Task Behavior section you mention partition
>> > hints, is it
>> > > possible to clarify what it is in the FLIP?
>> > >
>> > >
>> > > I have added the relevant details.
>> > >
>> > >
>> > > >  Are you able to articulate the default behavior?
>> > >
>> > >
>> > > The detailed explanation for this part has been updated.
>> > >
>> > >
>> > > >  How users can determine if states are compatible?
>> > >
>> > >
>> > > Users can only rely on their experience to make modifications.
>> > Currently,
>> > > the Flink framework does not guarantee that changes to SQL logic
>> will
>> > > maintain state compatibility.
>> > >
>> > > I think we can add some suggestions in the user documentation in
>> the
>> > > future. While the framework itself cannot ensure state
>> compatibility,
>> > some
>> > > simple modification scenarios can indeed be compatible.
>> > >
>> > > For now, the responsibility is left to the users.
>> > >
>> > >
>> > > Even if recovery ultimately fails, users still have the option to
>> roll
>> > > back to the original query or start consuming from a new offset by
>> > > disabling recovery parameters.
>> > >
>> > >
>> > >
>> > >
>> > > Best,
>> > > Feng
>> > >
>> > >
>> > > On Tue, Dec 17, 2024 at 10:37 AM Ron Liu > > wrote:
>>

[jira] [Created] (FLINK-36961) Wait ForSt state excutor shutdown when disposing

2024-12-25 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-36961:
--

 Summary: Wait ForSt state excutor shutdown when disposing
 Key: FLINK-36961
 URL: https://issues.apache.org/jira/browse/FLINK-36961
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36962) push down non-deterministic filter after stream join to source by mistake

2024-12-25 Thread Shuai Xu (Jira)
Shuai Xu created FLINK-36962:


 Summary: push down non-deterministic filter after stream join to 
source by mistake
 Key: FLINK-36962
 URL: https://issues.apache.org/jira/browse/FLINK-36962
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 2.0-preview
Reporter: Shuai Xu


Non-deterministic filter after stream join is push down to source by mistake.

Modify the 
org.apache.flink.table.planner.plan.stream.sql.CalcTest with following snippet 
of code.
 
{code:java}
@BeforeEach
def setup(): Unit = {
  util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
  util.addTableSource[(Long, Int, String)]("SourceTable", 'a, 'b, 'c)
  util.addTemporarySystemFunction("random_udf", new NonDeterministicUdf)
}

@Test
def testCalcWithNonDeterministicFilterAfterJoin(): Unit = {
  val sqlQuery =
"SELECT a FROM (SELECT t1.a, t1.c as t1c, t2.c as t2c FROM MyTable t1 join 
SourceTable t2 on t1.b = t2.b) t " +
  "WHERE TO_TIMESTAMP(t.t1c, '-MM-dd HH:mm:ss') > TIMESTAMPADD(HOUR, 
-2, NOW()) and t.t2c > '2022-01-01 00:00:00'"
  util.verifyRelPlan(sqlQuery)
}
{code}
we expected the plan with 
{code:java}
Calc(select=[a], where=[>(TO_TIMESTAMP(c, '-MM-dd HH:mm:ss'), +(NOW(), 
-720:INTERVAL HOUR))])+- Join(joinType=[InnerJoin], where=[=(b, b0)], 
select=[a, b, c, b0], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])   :- Exchange(distribution=[hash[b]])   :  +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])   +- 
Exchange(distribution=[hash[b]])  +- Calc(select=[b], where=[>(c, 
'2022-01-01 00:00:00')]) +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, SourceTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c]){code}
but the plan is
{code:java}
Calc(select=[a])
+- Join(joinType=[InnerJoin], where=[=(b, b0)], select=[a, b, b0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[b]])
   :  +- Calc(select=[a, b], where=[>(TO_TIMESTAMP(c, '-MM-dd HH:mm:ss'), 
+(NOW(), -720:INTERVAL HOUR))])
   : +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
   +- Exchange(distribution=[hash[b]])
  +- Calc(select=[b], where=[>(c, '2022-01-01 00:00:00')])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
SourceTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36963) Wrong context maintain around `asyncProcessWithKey`

2024-12-25 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-36963:
---

 Summary: Wrong context maintain around `asyncProcessWithKey`
 Key: FLINK-36963
 URL: https://issues.apache.org/jira/browse/FLINK-36963
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Async State Processing
Reporter: Zakelly Lan
Assignee: Zakelly Lan


Within `asyncProcessWithKey`, the key/record context is switched to the one for 
asynchronous procedure then switched back. However we picked the wrong old 
context and switched back with the wrong one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-496: SQL connector for keyed state data

2024-12-25 Thread Gabor Somogyi
If I understand correctly you use purely SQL workloads for savepoint
creation. In short such case the initial version won't be much help.

In a more detailed version SQL operators are creating non-keyed/operator
state data which is going to be supported later on.
The first version supports user code defined keyed state data.

BR,
G


On Tue, Dec 24, 2024 at 4:45 AM Venkatakrishnan Sowrirajan 
wrote:

> Hi Gabor,
>
> - Expose state metadata on data stream API
> > - Add metadata support for the SQL connector
> > - Add non-keyed/operator state data support for the SQL connector
> > - Add state catalog
>
>
> Thanks for this FLIP. This will be a great addition. Although, most of our
> use cases are SQL and not DataStream which seems to only benefit when
> "non-keyed/operator
> state data support for the SQL connector" is added. Is my understanding
> correct?
>
> Regards
> Venkata krishnan
>
>
> On Mon, Dec 23, 2024 at 6:59 PM Shengkai Fang  wrote:
>
> > Thanks for your clarification. Looking forward to the following FLIPs.
> >
> > Best,
> > Shengkai
> >
> > Gabor Somogyi  于2024年12月23日周一 18:32写道:
> >
> > > Hi Shengkai,
> > >
> > > Thanks for your comments.
> > >
> > > Both state catalog and metadata tables are on the roadmap but not in
> this
> > > FLIP.
> > > The main approach is to do things incrementally in order to avoid
> > > infinite discussion threads and 10k+ lines pull requests.
> > >
> > > Just to give some teaser, this is how I can roughly imagine upcoming
> > FLIPs
> > > (unordered):
> > > - Expose state metadata on data stream API
> > > - Add metadata support for the SQL connector
> > > - Add non-keyed/operator state data support for the SQL connector
> > > - Add state catalog
> > >
> > > BR,
> > > G
> > >
> > >
> > > On Mon, Dec 23, 2024 at 2:46 AM Shengkai Fang 
> wrote:
> > >
> > > > Hi.
> > > >
> > > > Thanks Gabor's great FLIP. It is very useful for many cases. But I
> have
> > > > some suggestions about this FLIP:
> > > >
> > > > 1. It's better to introduce a state Catalog that helps users to
> > translate
> > > > the state information to a table schema. The state catalog uses
> > > > ``.`` to find required state and uses
> state
> > > > descriptor to restore the schema.
> > > > 2. Can the connector provide more metadata information for users? For
> > > > example, the `ttl` field is very useful in our cases.
> > > >
> > > > Best,
> > > > Shengkai
> > > >
> > > >
> > > >
> > > >
> > > > Gabor Somogyi  于2024年12月19日周四 21:41写道:
> > > >
> > > > > Hi Yanquan,
> > > > >
> > > > > Thanks for the question, please see the FLIP content related this.
> > > > >
> > > > > ---BEGIN FLIP CONTENT---
> > > > > The target of this implementation proposal is to provide SQL
> support
> > > for
> > > > > keyed states. Other types are going to be covered in further FLIP
> > > > > documents.
> > > > > ---END FLIP CONTENT---
> > > > >
> > > > > Keyed state data which is defined by users in for example process
> > > > > functions, operator or non-keyed state data is defined by internal
> > > Flink
> > > > > operators for example sources/sinks.
> > > > > In short, we're intended to add this support later on which means
> > that
> > > > > further configuration parameters needs to be added.
> > > > >
> > > > > BR,
> > > > > G
> > > > >
> > > > >
> > > > > On Thu, Dec 19, 2024 at 2:12 PM Yanquan Lv 
> > > wrote:
> > > > >
> > > > > > Hi, Gabor. Thanks for driving this FLIP and this immediately
> > reminded
> > > > me
> > > > > > of exploring the status of the Source connector like Kafka or
> > > MySQLCDC
> > > > to
> > > > > > understand the progress of reading information.
> > > > > >
> > > > > > In SourceOperator, the state of reader in contained in a
> ListState
> > > > named
> > > > > > `SourceReaderState`[1], however, the state type of
> > > `SourceReaderState`
> > > > is
> > > > > > `byte[]`, so I am worried that we can't get any understandable
> > > > > information
> > > > > > using this connector. Do we consider supporting this scenario?
> > > > > >
> > > > > > If we want to support this scenario, perhaps we need to add more
> > > > > > parameters like fields.#.state-version-serializer and
> > > > > > fields.#.state-version to To specify the
> org.apache.flink.core.io
> > > > > .SimpleVersionedSerializer[2]
> > > > > > and version to be used, And it also requires more complex
> > > > implementation.
> > > > > >
> > > > > > Do I have any misunderstandings? Is this requirement something we
> > > need
> > > > to
> > > > > > support, or will it need support in the future.
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/9e38f32889e097f5a9b81f949643a2b037c1035c/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java*L109__;Iw!!IKRxdwAv5BmarQ!f-7abNJ6_TcTJTEJmqsVdk7k8XT3k4e-UlBe4cOzZ7-NvL83cup9bQB-HX0VKoaxPlbZyQ7rRrg6KTlxdJI$
> > > > > > [2]
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ur