[jira] [Created] (FLINK-35926) During rescale, jobmanager has incorrect judgment logic for the max parallelism.

2024-07-30 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-35926:
--

 Summary: During rescale, jobmanager has incorrect judgment logic 
for the max parallelism.
 Key: FLINK-35926
 URL: https://issues.apache.org/jira/browse/FLINK-35926
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.19.1
 Environment: flink-1.19.1

There is a high probability that 1.18 has the same problem
Reporter: yuanfenghu
 Attachments: image-2024-07-30-14-56-48-931.png, 
image-2024-07-30-14-59-26-976.png, image-2024-07-30-15-00-28-491.png

When I was using the adaptive scheduler and modified the task in parallel 
through the rest api, an incorrect decision logic occurred, causing the task to 
fail.
h2. produce:
When I start a simple job with a parallelism of 128, the Max Parallelism of the 
job will be set to 256 (through flink's internal calculation logic). Then I 
make a savepoint on the job and modify the parallelism of the job to 1. Restore 
the job from the savepoint. At this time, the Max Parallelism of the job is 
still 256:
 
!image-2024-07-30-14-56-48-931.png!

 
this is as expected, at this time I call the rest api to increase the 
parallelism to 129 (which is obviously reasonable, since it is < 128), but the 
task throws an exception after restarting:
 
!image-2024-07-30-14-59-26-976.png!
At this time, when viewing the detailed information of the task, it is found 
that Max Parallelism has changed to 128:
 
!image-2024-07-30-15-00-28-491.png!

 
This can be reproduced stably locally
 
h3. Causes:
 
In AdaptiveScheduler we recalculate the job `VertexParallelismStore`,
This results in the job after restart having the wrong max parallelism.

, which seems to be related to FLINK-21844 and FLINK-22084 .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-30 Thread Zdenek Tison
Hi all,

Based on the discussion, I added a new configuration:
*jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
We considered the following options for the default value:

   1. Use a separate default value, e.g., 60s.
   2. Fallback to
   *jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
   3. Use the value from
   *jobmanager.adaptive-scheduler.scaling-interval.max.*
   4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).

We decided against option 2) because, as discussed in the mailing list, the
value can be too low. Option 3 was also ruled out since it can be too high
or unset and *scaling-interval.ma *x serves a
different use case (it works well with *parallelism-increase*). Option 4
was not chosen because it would affect existing jobs after migration. After
migrating to the new Flink version, rescaling would only happen if the
desired resources were available. However, rescaling happened with every
resource change before migration.

Therefore, I prefer a new default value: 60s.


Additionally, we reviewed the current set of parameters and think there is
a change to align the parameters along the functionality with the release
of 2.0. So, we propose to have these parameters:
*jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout *
*jobmanager.adaptive-scheduler.submission.resource-wait-timeout*

*jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
*jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
*jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
*jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*

Link to the updated FLIP doc.


Thanks a lot.

Regards,
Zdenek

On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison  wrote:

> Hi Gyula,
>
> Thank you for reviewing the document and providing feedback.
>
>1. I agree that we need two separate parameters for stabilization
>intervals in different states. I will update the FLIP document accordingly.
>2. That's correct. We reached the same conclusion while prototyping
>the implementation. I will add a new bullet point to the FLIP document.
>
> Thanks a lot.
>
> Regards,
> Zdenek
>
>
> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra  wrote:
>
>> Hi All!
>>
>> Thank you for the proposal, I think it will be great to simplify the
>> current rescaling flow to make it more digestible :)
>>
>> I have 2 comments:
>>
>> 1. Related to what Matthias already pointed out, I think in production
>> scenarios it may be a typical requirement to have a fairly short
>> stabilization interval for job startup (reduce downtime) but overall a
>> longer stabilization period for Executing jobs before rescaling to avoid
>> fluctuations and therefore reduce downtime. I think it would be very
>> important to have 2 configs for that, one could fall back to the other of
>> course if undefined.
>>
>> 2. The document mentions that the stabilization period for executing jobs
>> is measured from the first resource event. I feel that if after the
>> stabilization period we dont have sufficient resources we should
>> completely
>> reset this timer and start the timeout from 0 when the next event arrives.
>> This will be more in line with the concept of stabilization, otherwise if
>> you receive a batch of new resources you may not utilize it because as
>> soon
>> as you have sufficient we rescale immediately.
>>
>> Cheers,
>> Gyula
>>
>>
>>
>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison > >
>> wrote:
>>
>> > Thanks, Mathias, for your opinions.
>> >
>> > I see two scenarios where different values for starting and rescaling
>> would
>> > be appropriate:
>> >
>> > 1) Flink serverless providers may prefer the fastest possible job
>> startup
>> > time, which can also be achieved by setting a smaller value for the
>> > stabilization timeout, such as 1 second, in the WaitingForResources
>> state.
>> > Conversely, to ensure maximum job uptime, it would be prudent to
>> increase
>> > the stabilization period for rescaling to a higher value, such as 1
>> minute,
>> > to handle server/node maintenance effectively.
>> >
>> > 2) In Reactive mode, the stabilization period is set to 0 by default.
>> > Setting a different default value for the rescale state could enhance
>> job
>> > stability during node maintenance, especially since the parameter
>> > min-parallelism-increase is no longer applicable.
>> >
>> > Regards,
>> >
>> > Zdenek
>> >
>> > On Tue, Jul 16, 2024 at 5:49 PM Matthias Pohl 
>> wrote:
>> >
>> > > Thanks Zdenek for your proposal on aligning the resource control logic
>> > > within the AdaptiveScheduler and cleaning up the rescaling code.
>> > >
>> > > Consolidating the parameters and the code as part of the 2.0 release
>> > makes
>> > > sense in my opinion: The proposed change adds consistent behavior to
>> the
>> > > WaitingForRes

Re: [DISCUSS] FLIP-XXX: Aligning timeout logic in the AdaptiveScheduler's WaitingForResources and Executing states

2024-07-30 Thread Zdenek Tison
Hi,

If there are no further comments, I would propose starting a vote on these
changes. But first, I would like to ask a committer to migrate the draft to
an FLIP in the Flink Wiki.

Thanks a lot.

Kind Regards,

Zdenek

On Tue, Jul 30, 2024 at 10:36 AM Zdenek Tison  wrote:

> Hi all,
>
> Based on the discussion, I added a new configuration:
> *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*.
> We considered the following options for the default value:
>
>1. Use a separate default value, e.g., 60s.
>2. Fallback to
>*jobmanager.adaptive-scheduler.resource-stabilization-timeout*.
>3. Use the value from
>*jobmanager.adaptive-scheduler.scaling-interval.max.*
>4. Use a large number like Duration.ofMillis(Long.MAX_VALUE).
>
> We decided against option 2) because, as discussed in the mailing list,
> the value can be too low. Option 3 was also ruled out since it can be too
> high or unset and *scaling-interval.ma *x
> serves a different use case (it works well with *parallelism-increase*).
> Option 4 was not chosen because it would affect existing jobs after
> migration. After migrating to the new Flink version, rescaling would only
> happen if the desired resources were available. However, rescaling happened
> with every resource change before migration.
>
> Therefore, I prefer a new default value: 60s.
>
>
> Additionally, we reviewed the current set of parameters and think there is
> a change to align the parameters along the functionality with the release
> of 2.0. So, we propose to have these parameters:
> *jobmanager.adaptive-scheduler.submission.resource-stabilization-timeout *
> *jobmanager.adaptive-scheduler.submission.resource-wait-timeout*
>
> *jobmanager.adaptive-scheduler.executing.cooldown-after-rescaling*
> *jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout*
>
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-checkpoint-failures*
> *jobmanager.adaptive-scheduler.executing.rescale-trigger.max-delay*
>
> Link to the updated FLIP doc.
> 
>
> Thanks a lot.
>
> Regards,
> Zdenek
>
> On Wed, Jul 24, 2024 at 2:22 PM Zdenek Tison  wrote:
>
>> Hi Gyula,
>>
>> Thank you for reviewing the document and providing feedback.
>>
>>1. I agree that we need two separate parameters for stabilization
>>intervals in different states. I will update the FLIP document 
>> accordingly.
>>2. That's correct. We reached the same conclusion while prototyping
>>the implementation. I will add a new bullet point to the FLIP document.
>>
>> Thanks a lot.
>>
>> Regards,
>> Zdenek
>>
>>
>> On Tue, Jul 23, 2024 at 3:02 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> Thank you for the proposal, I think it will be great to simplify the
>>> current rescaling flow to make it more digestible :)
>>>
>>> I have 2 comments:
>>>
>>> 1. Related to what Matthias already pointed out, I think in production
>>> scenarios it may be a typical requirement to have a fairly short
>>> stabilization interval for job startup (reduce downtime) but overall a
>>> longer stabilization period for Executing jobs before rescaling to avoid
>>> fluctuations and therefore reduce downtime. I think it would be very
>>> important to have 2 configs for that, one could fall back to the other of
>>> course if undefined.
>>>
>>> 2. The document mentions that the stabilization period for executing jobs
>>> is measured from the first resource event. I feel that if after the
>>> stabilization period we dont have sufficient resources we should
>>> completely
>>> reset this timer and start the timeout from 0 when the next event
>>> arrives.
>>> This will be more in line with the concept of stabilization, otherwise if
>>> you receive a batch of new resources you may not utilize it because as
>>> soon
>>> as you have sufficient we rescale immediately.
>>>
>>> Cheers,
>>> Gyula
>>>
>>>
>>>
>>> On Thu, Jul 18, 2024 at 9:58 AM Zdenek Tison >> >
>>> wrote:
>>>
>>> > Thanks, Mathias, for your opinions.
>>> >
>>> > I see two scenarios where different values for starting and rescaling
>>> would
>>> > be appropriate:
>>> >
>>> > 1) Flink serverless providers may prefer the fastest possible job
>>> startup
>>> > time, which can also be achieved by setting a smaller value for the
>>> > stabilization timeout, such as 1 second, in the WaitingForResources
>>> state.
>>> > Conversely, to ensure maximum job uptime, it would be prudent to
>>> increase
>>> > the stabilization period for rescaling to a higher value, such as 1
>>> minute,
>>> > to handle server/node maintenance effectively.
>>> >
>>> > 2) In Reactive mode, the stabilization period is set to 0 by default.
>>> > Setting a different default value for the rescale state could enhance
>>> job
>>> > stability during node maintenance, especially since the parameter
>>> > min-parallelism-increase is no longer applicable.
>>> >
>>> > Reg

[jira] [Created] (FLINK-35927) Support closing ForSt meta file when using object storage

2024-07-30 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-35927:


 Summary: Support closing ForSt meta file when using object storage
 Key: FLINK-35927
 URL: https://issues.apache.org/jira/browse/FLINK-35927
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35928) ForSt supports compiling with RocksDB

2024-07-30 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-35928:


 Summary: ForSt supports compiling with RocksDB
 Key: FLINK-35928
 URL: https://issues.apache.org/jira/browse/FLINK-35928
 Project: Flink
  Issue Type: Sub-task
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-468: Introducing StreamGraph-Based Job Submission.

2024-07-30 Thread Zhu Zhu
Thanks for sharing the thoughts, David!

IIUC, there are two goals to make the APIs used for RPCs as
versionless as possible?
1. no version mismatch happens if no code changes
2. supports different versioned Flink clients and clusters
The first goal can be achieved by explicitly assigning a serialVersionUID
to serializable classes, which is already required in Flink code style.
Therefore, it should not be a major problem.
The second goal requires introducing versioned serializers and rewriting
classes to serialize. It can also bring benefits like better performance.
However, it is not limited to the job submission API. And it would require
a thorough and maybe complex design. Therefore, I think it's better to do
it in an individual FLIP.

Regarding submitting a StreamGraph or a list of transformations with
configuration, I think StreamGraph is simple enough. It contains fields
like operators, partitioners and configuration, which are already
serialized to JobGraph nowadays, which means there is little work and
risk to turn it into a serializable. Directly serializing and submitting
transformations is also acceptable at the first glance, but
SinkTransformation seems to be a blocker, as Junrui just mentioned.

Thanks,
Zhu

Junrui Lee  于2024年7月30日周二 12:13写道:

> Hi David,
>
> Thank you very much for your detailed explanation, which is crucial in
> helping to further improve this FLIP.
>
> This FLIP is applicable to both batch and stream processing. For batch
> processing, it can be used to optimize the StreamGraph (e.g., FLIP-469),
> while for streaming, we can use the StreamGraph to show a detailed logical
> plan at runtime (e.g., FLINK-33230) and potentially retain possible logical
> topology optimizations in the future (e.g., performing intelligent chain
> breaks while ensuring state compatibility).
>
> Overall, whether based on the existing JobGraph or the proposed submission
> based on StreamGraph, it is invisible to users, and its REST API is
> internal because the JobGraph and StreamGraph are internal classes.
> Although it is documented, we could consider removing it from the official
> documentation.
>
> Regarding your mention of "making the RPC APIs as versionless as possible,"
> I think your viewpoint is correct and highly valuable. I have carefully
> considered your suggestion of serializing a list of transformations and a
> configuration. This is indeed a step towards making the APIs used for RPCs
> as versionless as possible.
> However, I think this task is much more complex than serializing a
> StreamGraph, as it requires ensuring that each subclass of transformation
> and its properties are serializable. This obviously adds a significant
> amount of complexity. For example, the SinkTransformation includes
> DataStream properties, which have many unserializable fields, such as the
> StreamExecutionEnvironment.
> Moreover, this solution does not completely solve the problem of RPC API
> versioning.
>
> Therefore, as this FLIP does not truly change a public REST API, I think we
> can narrow down the scope of this FLIP a bit, focusing on how to enable the
> JM to see and operate on StreamGraph.
> In my understanding, current proposal will not complicate the future work
> if Flink tries to make its REST API more versionless, e.g. directly submit
> transformations. Instead, most of the work can be reused, like creating
> JobGraph at runtime.
>
> WDYT? Looking forward to your feedback
>
> Best,
> Junrui
>
> David Morávek  于2024年7月29日周一 21:34写道:
>
> > Hi all,
> >
> > My main concern is the absence of a comprehensive vision for how we could
> > make this work for both Batch and Streaming. Right now, it feels like the
> > proposal is solely centered around very specific batch optimizations.
> >
> > I’m inclined to support submitting StreamGraph because it could already
> > > provide the actual logical plan of the job at runtime. To be honest,
> I’m
> > > not sure what additional benefits submitting Transformations would
> bring
> > > compared to StreamGraph. I would appreciate any insights that you might
> > > offer on this matter.
> >
> >
> > This FLIP actually proposes a new job representation, so it would be
> great
> > to learn from the mistakes of the JobGraph. The main drawback of the
> > JobGraph is its very tight coupling with the particular Flink version.
> Even
> > a small patch version difference between client and server can make the
> > JobGraph invalid due to the nature of Java Serializability. StreamGraph
> is
> > exposed to an even bigger surface area with more internal data structures
> > that have the potential to make this problem more visible.
> >
> > In general, it would be highly valuable to make the APIs used for RPCs as
> > versionless as possible. If you were to write a custom serializer for the
> > StreamGraph, you’d actually find that you don’t need more than a list of
> > transformations and a configuration. Passing those to the
> > StreamGraphGenerator will produce a determin

[jira] [Created] (FLINK-35929) In flink insert mode, it supports modifying the parallelism of jdbc sink when the parallelism of source and sink is the same.

2024-07-30 Thread Qiu (Jira)
Qiu created FLINK-35929:
---

 Summary: In flink insert mode, it supports modifying the 
parallelism of jdbc sink when the parallelism of source and sink is the same.
 Key: FLINK-35929
 URL: https://issues.apache.org/jira/browse/FLINK-35929
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: jdbc-3.1.2
Reporter: Qiu
 Attachments: image-2024-07-30-19-57-45-033.png, 
image-2024-07-30-19-57-50-868.png

In insert mode, when the source and sink parallelism are consistent, if you 
reduce or increase the jdbc sink parallelism, the SQL verification will report 
an error. The following is the error message.

configured sink parallelism is: 8, while the input parallelism is: -1. Since 
configured parallelism is different from input parallelism and the changelog 
mode contains [INSERT,UPDATE_AFTER,DELETE], which is not INSERT_ONLY mode, 
primary key is required but no primary key is found!
{code:java}
//代码占位符
module: flink-connector-jdbc
class: org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink

public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
validatePrimaryKey(requestedMode);
ChangelogMode.Builder changelogModeBuilder = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT);

if (tableSchema.getPrimaryKey().isPresent()) {
changelogModeBuilder
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE);
}

return changelogModeBuilder.build();
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35930) Add FORMAT_STRING function

2024-07-30 Thread Dylan He (Jira)
Dylan He created FLINK-35930:


 Summary: Add FORMAT_STRING function
 Key: FLINK-35930
 URL: https://issues.apache.org/jira/browse/FLINK-35930
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dylan He


Add FORMAT_STRING function as the same in Spark.
This function is a synonym for PRINTF function.


Returns a formatted string from printf-style format strings.

Example:
{code:sql}
> SELECT FORMAT_STRING('Hello World %d %s', 100, 'days');
 Hello World 100 days
{code}

Syntax:
{code:sql}
FORMAT_STRING(strfmt, obj...)
{code}

Arguments:
* {{strfmt}}: A STRING expression.
* {{obj}}: ANY expression.

Returns:
A STRING.

See also:
 * 
[Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
 * 
[Databricks|https://docs.databricks.com/en/sql/language-manual/functions/format_string.html]
* 
[Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-StringFunctions]
 printf
* [PostgreSQL|https://www.postgresql.org/docs/16/functions-string.html] format



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35931) Add REGEXP_EXTRACT_ALL function

2024-07-30 Thread Dylan He (Jira)
Dylan He created FLINK-35931:


 Summary: Add REGEXP_EXTRACT_ALL function
 Key: FLINK-35931
 URL: https://issues.apache.org/jira/browse/FLINK-35931
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dylan He


Add REGEXP_EXTRACT_ALL function.

Extracts all of the strings in {{str}} that match the {{regexp}} expression and 
correspond to the regex group {{idx}}.

Example:
{code:sql}
> SELECT REGEXP_EXTRACT_ALL('100-200, 300-400', '(\\d+)-(\\d+)', 1);
 [100, 300]
> SELECT REGEXP_EXTRACT_ALL('100-200, 300-400', r'(\d+)-(\d+)', 1);
 ["100","300"]
{code}

Syntax:
{code:sql}
REGEXP_EXTRACT_ALL(str, regexp[, idx])
{code}

Arguments:
 * {{str}}: A STRING expression to be matched.
 * {{regexp}}: A STRING expression with a matching pattern.
 * {{idx}}: An optional INTEGER expression greater or equal 0 with default 1.

Returns:
An ARRAY.
{{regexp}} must be a Java regular expression.
When using literals, use `raw-literal` (`r` prefix) to avoid escape character 
pre-processing.
{{regexp}} may contain multiple groups. {{idx}} indicates which regex group to 
extract. An {{idx}} of 0 means matching the entire regular expression.

See also:
 * 
[Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
 * 
[Databricks|https://docs.databricks.com/en/sql/language-manual/functions/regexp_extract_all.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35932) Add REGEXP_COUNT function

2024-07-30 Thread Dylan He (Jira)
Dylan He created FLINK-35932:


 Summary: Add REGEXP_COUNT function
 Key: FLINK-35932
 URL: https://issues.apache.org/jira/browse/FLINK-35932
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dylan He


Add REGEXP_COUNT function.

Returns the number of times {{str}} matches the {{regexp}} pattern.

Example:
{code:sql}
> SELECT REGEXP_COUNT('Steven Jones and Stephen Smith are the best players', 
> 'Ste(v|ph)en');
 2
> SELECT REGEXP_COUNT('Mary had a little lamb', 'Ste(v|ph)en');
 0
{code}

Syntax:
{code:sql}
REGEXP_COUNT(str, regexp)
{code}

Arguments:
 * {{str}}: A STRING expression to be matched.
 * {{regexp}}: A STRING expression with a matching pattern.

Returns:
An INTEGER.

{{regexp}} must be a Java regular expression.
When using literals, use `raw-literal` (`r` prefix) to avoid escape character 
pre-processing.
If either argument is {{NULL}}, the result is {{NULL}}.

See also:
 * 
[Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
 * 
[Databricks|https://docs.databricks.com/en/sql/language-manual/functions/regexp_count.html]
 * [PostgreSQL|https://www.postgresql.org/docs/16/functions-string.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-468: Introducing StreamGraph-Based Job Submission.

2024-07-30 Thread David Morávek
Here is a summary of my current understanding, please correct me if it's
wrong.

We currently have an API for submitting a list of transformations and
configurations called CompiledPlan. However, it is currently specific to
the SQL domain.

In reviewing the follow-up FLIPs, it is evident that we are targeting graph
changes exclusively for the SQL landscape. Implementing these changes for
the DataStream API may not be feasible, as we lack the semantic information
needed to understand the underlying pipeline.

There appear to be three main categories of issues:

1. JobGraph Changes: To recompute the JobGraph based on new evidence, we
need to have a clear understanding of the pipeline, which is likely
exclusive to SQL. This requires the use of SG/CompiledPlan.
2. Partitioning Changes: These primarily involve key group redistributions
and do not necessitate changes to the JobGraph. Such optimizations can be
applied to both DataStream and SQL.
3. Changes to WebUI: These changes can be addressed at the JobGraph level
by providing additional context to JsonPlan, without requiring
SG/CompiledPlan.

Given these points, is it accurate to say that this FLIP is strictly
necessary for TableAPI/SQL jobs only?

Best,
D.

On Tue, Jul 30, 2024 at 11:25 AM Zhu Zhu  wrote:

> Thanks for sharing the thoughts, David!
>
> IIUC, there are two goals to make the APIs used for RPCs as
> versionless as possible?
> 1. no version mismatch happens if no code changes
> 2. supports different versioned Flink clients and clusters
> The first goal can be achieved by explicitly assigning a serialVersionUID
> to serializable classes, which is already required in Flink code style.
> Therefore, it should not be a major problem.
> The second goal requires introducing versioned serializers and rewriting
> classes to serialize. It can also bring benefits like better performance.
> However, it is not limited to the job submission API. And it would require
> a thorough and maybe complex design. Therefore, I think it's better to do
> it in an individual FLIP.
>
> Regarding submitting a StreamGraph or a list of transformations with
> configuration, I think StreamGraph is simple enough. It contains fields
> like operators, partitioners and configuration, which are already
> serialized to JobGraph nowadays, which means there is little work and
> risk to turn it into a serializable. Directly serializing and submitting
> transformations is also acceptable at the first glance, but
> SinkTransformation seems to be a blocker, as Junrui just mentioned.
>
> Thanks,
> Zhu
>
> Junrui Lee  于2024年7月30日周二 12:13写道:
>
> > Hi David,
> >
> > Thank you very much for your detailed explanation, which is crucial in
> > helping to further improve this FLIP.
> >
> > This FLIP is applicable to both batch and stream processing. For batch
> > processing, it can be used to optimize the StreamGraph (e.g., FLIP-469),
> > while for streaming, we can use the StreamGraph to show a detailed
> logical
> > plan at runtime (e.g., FLINK-33230) and potentially retain possible
> logical
> > topology optimizations in the future (e.g., performing intelligent chain
> > breaks while ensuring state compatibility).
> >
> > Overall, whether based on the existing JobGraph or the proposed
> submission
> > based on StreamGraph, it is invisible to users, and its REST API is
> > internal because the JobGraph and StreamGraph are internal classes.
> > Although it is documented, we could consider removing it from the
> official
> > documentation.
> >
> > Regarding your mention of "making the RPC APIs as versionless as
> possible,"
> > I think your viewpoint is correct and highly valuable. I have carefully
> > considered your suggestion of serializing a list of transformations and a
> > configuration. This is indeed a step towards making the APIs used for
> RPCs
> > as versionless as possible.
> > However, I think this task is much more complex than serializing a
> > StreamGraph, as it requires ensuring that each subclass of transformation
> > and its properties are serializable. This obviously adds a significant
> > amount of complexity. For example, the SinkTransformation includes
> > DataStream properties, which have many unserializable fields, such as the
> > StreamExecutionEnvironment.
> > Moreover, this solution does not completely solve the problem of RPC API
> > versioning.
> >
> > Therefore, as this FLIP does not truly change a public REST API, I think
> we
> > can narrow down the scope of this FLIP a bit, focusing on how to enable
> the
> > JM to see and operate on StreamGraph.
> > In my understanding, current proposal will not complicate the future work
> > if Flink tries to make its REST API more versionless, e.g. directly
> submit
> > transformations. Instead, most of the work can be reused, like creating
> > JobGraph at runtime.
> >
> > WDYT? Looking forward to your feedback
> >
> > Best,
> > Junrui
> >
> > David Morávek  于2024年7月29日周一 21:34写道:
> >
> > > Hi all,
> > >
> > > My main concern is

Re: [DISCUSS] FLIP-470: Support Adaptive Broadcast Join

2024-07-30 Thread Lincoln Lee
Thanks Xia for your explanation!

I can understand your concern, but considering the design of this FLIP,
which already covers compile-time inaccurate optimization for runtime
de-optimization, is it necessary to make the user manually turn off
'table.optimizer.join.broadcast-threshold' and set the new
'table.optimizer.adaptive.join.broadcast-threshold' again? Another option
is that users only need to focus on the existing broadcast size threshold,
and accept the reality that 100% accurate optimization cannot be done
at compile time, and adopt the new capability of dynamic optimization at
runtime, and ultimately, users will trust that flink will always optimize
accurately, and from this point of view, I would prefer a generic parameter
'table.optimizer. adaptive-optimization.enabled', which would allow for
more dynamic optimization in the future, not limited to broadcast join
scenarios and will not continuously bring more new options, WDYT?


Best,
Lincoln Lee


Xia Sun  于2024年7月30日周二 11:27写道:

> Hi Lincoln,
>
> Thank you for your input and participation in the discussion!
>
> Compared to introducing the 'table.optimizer.adaptive-join.enabled' option,
> introducing the "table.optimizer.adaptive.join.broadcast-threshold" can
> also cover the need to disable static broadcast optimization while only
> enabling dynamic broadcast optimization. From this perspective, introducing
> a new threshold configuration might be more appropriate. What do you think?
>
> Best,
> Xia
>
> Lincoln Lee  于2024年7月29日周一 23:12写道:
>
> > +1 for this useful optimization!
> >
> > I have a question about the new optoin, do we really need two broadcast
> > join thresholds? IIUC, this adaptive broadcast join is a complement to
> > compile-time optimization, there is no need for the user to configure two
> > different thresholds (not the off represented by -1), so we just want to
> > control the adaptive optimization itself, should we provide a
> configuration
> > option like 'table.optimizer.adaptive-join.enabled' or a more general one
> > 'table.optimizer.adaptive-optimization.enabled' for such related
> > optimizations?
> >
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Ron Liu  于2024年7月26日周五 11:59写道:
> >
> > > Hi, Xia
> > >
> > > Thanks for your reply. It looks good to me.
> > >
> > >
> > > Best,
> > > Ron
> > >
> > > Xia Sun  于2024年7月26日周五 10:49写道:
> > >
> > > > Hi Ron,
> > > >
> > > > Thanks for your feedback!
> > > >
> > > > -> creation of the join operators until runtime
> > > >
> > > >
> > > > That means when creating the AdaptiveJoinOperatorFactory, we will not
> > > > immediately create the JoinOperator. Instead, we only pass in the
> > > necessary
> > > > parameters for creating the JoinOperator. The appropriate
> JoinOperator
> > > will
> > > > be created during the StreamGraphOptimizationStrategy optimization
> > phase.
> > > >
> > > > You mentioned that the runtime's visibility into the table planner is
> > > > indeed an issue. It includes two aspects,
> > > > (1) we plan to place both implementations of the
> > > > AdaptiveBroadcastJoinOptimizationStrategy and
> > AdaptiveJoinOperatorFactory
> > > > in the table layer. During the runtime phase, we will obtain the
> > > > AdaptiveBroadcastJoinOptimizationStrategy through class loading.
> > > Therefore,
> > > > the flink-runtime does not need to be aware of the table layer's
> > > > implementation.
> > > > (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory
> needs
> > to
> > > > be aware of the table planner, we will consider placing the
> > > > AdaptiveJoinOperatorFactory in the table planner module as well.
> > > >
> > > >
> > > >  -> When did you configure these optimization strategies uniformly
> into
> > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`
> > > >
> > > >
> > > > Thank you for pointing out this issue. When there are multiple
> > > > StreamGraphOptimizationStrategies, the optimization order at the
> > runtime
> > > > phase will strictly follow the order specified in the configuration
> > > option
> > > > `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > > Therefore,
> > > > it is necessary to have a unified configuration during the sql
> planner
> > > > phase to ensure the correct optimization order. Currently, we are
> > > > considering performing this unified configuration in
> > > > BatchPlanner#afterTranslation().
> > > >
> > > > For simplicity, as long as the adaptive broadcast join/skewed join
> > > > optimization features are enabled (e.g.,
> > > > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
> > > > corresponding strategy will be configured. This optimization is
> > > independent
> > > > of the specific SQL query, although it might not produce any actual
> > > effect.
> > > >
> > > > Best,
> > > > Xia
> > > >
> > > > Ron Liu  于2024年7月24日周三 14:10写道:
> > > >
> > > > > Hi, Xia
> > > > >
> > > > > This FLIP looks good to me, +1.
> > > > >
> > > > > I've two questions:
> > > > >
> > > >

Re: [DISCUSS] FLIP-471: Fixing watermark idleness timeout accounting

2024-07-30 Thread Piotr Nowojski
Thanks Zakelly. I will stick with the current proposal (given 2:1 in favour
of having a separate `RelativeClock`).

I will open a voting thread soon.

Best,
Piotrek

wt., 30 lip 2024 o 06:04 Zakelly Lan  napisał(a):

> Hi Piotr,
>
> I'd prefer to change the semantics of the existing `Clock`, as forcing the
> relative time and absolute time to run at the same speed limits the use of
> this clock. Anyway, I have no strong feelings. I understand your concern
> and introducing a new interface is also fine with me.
>
>
> Best,
> Zakelly
>
> On Mon, Jul 29, 2024 at 6:16 PM Piotr Nowojski 
> wrote:
>
> > Hi Arvid,
> >
> > > Yes, I think it would be sufficient to say that the relative time
> doesn't
> > > need to move at the same speed as wall clock without going into details
> > > such as backpressure (that should be moved to the doc of the
> WMContext).
> >
> > Makes sense. I've updated the FLIP. In the final PR I will try to phrase
> > that
> > better with a bit more detail.
> >
> > > I understand your concern and we can keep the original semantics. I'd
> > still
> > > like to see the name of the accessor to be a bit more specific for the
> > task
> > > at hand. RelativeClock is a very broad name because it's supposed to be
> > > generic. Then we should have a specific name for the clock in the
> > context,
> > > such as getInputActivityClock or so.
> >
> > +1
> >
> > I like this name and I've updated the FLIP.
> >
> > Best,
> > Piotrek
> >
> > pt., 26 lip 2024 o 14:47 Arvid Heise 
> > napisał(a):
> >
> > > Hi Piotr,
> > >
> > > > > 1. `RelativeClock` looks like a super-interface of
> > > > > `org.apache.flink.util.clock.Clock`. Should we also reflect that
> > > > > accordingly by extending? This should not break anything.
> > > >
> > > > It should be fine to do so, but to me the question is if we should do
> > so?
> > > > It doesn't give any benefit right now,
> > > > if necessary either we or someone else can do that in the future.
> While
> > > in
> > > > the meantime it makes the APIs
> > > > somehow a bit more strict/more defined, so a bit higher chance of
> > causing
> > > > some problems if we decide to
> > > > change something.
> > > One obvious advantage is that we could use ManualClock for testing.
> Other
> > > than that, I'd be fine to leave as-is and adjust later.
> > >
> > > >
> > > > > 2. Since `RelativeClock` is rather general, I'd also propose to
> > change
> > > > the
> > > > > javadoc to not include the pausing part.
> > > >
> > > > My intention was not to enforce in the contract that progress of the
> > > timer
> > > > must be paused, but the interface
> > > > doesn't guarantee that the returned relativeTimeNanos value will
> follow
> > > the
> > > > wall clock 1 to 1. That concrete
> > > > classes can pause progress of that time if needed. I wanted to state
> > that
> > > > clearly, to better highlight the
> > > > difference against the `Clock`.
> > > >
> > > > Maybe the java doc should be clarified?
> > > Yes, I think it would be sufficient to say that the relative time
> doesn't
> > > need to move at the same speed as wall clock without going into details
> > > such as backpressure (that should be moved to the doc of the
> WMContext).
> > >
> > > >
> > > > > 4. I now realized that I changed semantics compared to the
> proposal:
> > > this
> > > > > idle clock would already calculate the time difference (now - last
> > > > event).
> > > > > That may narrow down the usefulness but makes the only known use
> case
> > > > more
> > > > > explicit.
> > > > >
> > > > > Do we have other (future) use cases that could profit from having
> > > access
> > > > to
> > > > > the relative time of the last event occurring? Basically would we
> > ever
> > > > need
> > > > > (now - some event) or (now - some periodic time)?
> > > >
> > > > I'm not sure. Idle clock seems a bit too specific to me. Also, at
> least
> > > > currently, idle time at the watermark
> > > > generator level is not calculated accurately. Exposing an approximate
> > > value
> > > > via API might make some
> > > > false impression that it's accurate?
> > > I understand your concern and we can keep the original semantics. I'd
> > still
> > > like to see the name of the accessor to be a bit more specific for the
> > task
> > > at hand. RelativeClock is a very broad name because it's supposed to be
> > > generic. Then we should have a specific name for the clock in the
> > context,
> > > such as getInputActivityClock or so.
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > >
> > > On Fri, Jul 26, 2024 at 12:21 PM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Zakelly,
> > > >
> > > > > Could we just use `org.apache.flink.util.clock.Clock` instead of
> > > > introducing a
> > > > > new one?
> > > >
> > > > We can not use it as is, because of this java doc in it:
> > > >
> > > >  * Relative Time
> > > >  *
> > > >  * This time advances at the same speed as the absolute
> time,
> > > but
> > > > the timestamps can only
> > > >  * be referred to re

[VOTE] FLIP-471: Fixing watermark idleness timeout accounting

2024-07-30 Thread Piotr Nowojski
Hi all!

I would like to open the vote for FLIP-471 [1]. It has been discussed here
[2].

The vote will remain open for at least 72 hours.

Best,
Piotrek

[1] https://cwiki.apache.org/confluence/x/oQvOEg
[2] https://lists.apache.org/thread/byj1l2236rfx3mcl3v4374rcbkq4rf85


[jira] [Created] (FLINK-35933) Skip distributing maxAllowedWatermark if there are no subtasks

2024-07-30 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-35933:
-

 Summary: Skip distributing maxAllowedWatermark if there are no 
subtasks
 Key: FLINK-35933
 URL: https://issues.apache.org/jira/browse/FLINK-35933
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.20.0


On JM, `SourceCoordinator.announceCombinedWatermark` executes unnecessary if 
there are no subtasks to distribute maxAllowedWatermark. This involves Heap and 
ConcurrentHashMap accesses and lots of logging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35934) Add CompiledPlan annotations to BatchExecValues

2024-07-30 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-35934:
--

 Summary: Add CompiledPlan annotations to BatchExecValues
 Key: FLINK-35934
 URL: https://issues.apache.org/jira/browse/FLINK-35934
 Project: Flink
  Issue Type: Sub-task
Reporter: Jim Hughes
Assignee: Jim Hughes






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35935) CREATE TABLE AS doesn't work with LIMIT

2024-07-30 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-35935:
---

 Summary: CREATE TABLE AS doesn't work with LIMIT
 Key: FLINK-35935
 URL: https://issues.apache.org/jira/browse/FLINK-35935
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.1
Reporter: Xingcan Cui


{code:java}
CREATE TABLE WITH (foo) AS (SELECT * FROM bar LIMIT 5){code}
The above statement throws "Caused by: java.lang.AssertionError: not a query: " 
exception.

A workaround is to wrap the query with CTE.
{code:java}
CREATE TABLE WITH (foo) AS (WITH R AS (SELECT * FROM bar LIMIT 5) SELECT * FROM 
R){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35936) paimon cdc schema evolution failure when restart job

2024-07-30 Thread MOBIN (Jira)
MOBIN created FLINK-35936:
-

 Summary: paimon cdc schema evolution failure when restart job
 Key: FLINK-35936
 URL: https://issues.apache.org/jira/browse/FLINK-35936
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
 Environment: Flink 1.19

cdc master
Reporter: MOBIN


paimon cdc schema evolution failure when restart job

Minimal reproduce step:
 # stop flink-cdc-mysql-to-paimon pipeline job
 # alter mysql table schema, such as add column
 # start pipeline job
 # the newly added column was not synchronized to the paimon table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-471: Fixing watermark idleness timeout accounting

2024-07-30 Thread Zakelly Lan
+1 (binding)


Best,
Zakelly

On Wed, Jul 31, 2024 at 12:07 AM Piotr Nowojski 
wrote:

> Hi all!
>
> I would like to open the vote for FLIP-471 [1]. It has been discussed here
> [2].
>
> The vote will remain open for at least 72 hours.
>
> Best,
> Piotrek
>
> [1] https://cwiki.apache.org/confluence/x/oQvOEg
> [2] https://lists.apache.org/thread/byj1l2236rfx3mcl3v4374rcbkq4rf85
>