[jira] [Created] (FLINK-32880) Redundant taskManager should be replenished in FineGrainedSlotManager

2023-08-16 Thread xiangyu feng (Jira)
xiangyu feng created FLINK-32880:


 Summary: Redundant taskManager should be replenished in 
FineGrainedSlotManager
 Key: FLINK-32880
 URL: https://issues.apache.org/jira/browse/FLINK-32880
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: xiangyu feng


Currently, if you are using FineGrainedSlotManager, when a redundant 
taskmanager exit abnormally, no extra taskmanager will be replenished.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32881) Client supports making savepoints in detach mode

2023-08-16 Thread Renxiang Zhou (Jira)
Renxiang Zhou created FLINK-32881:
-

 Summary: Client supports making savepoints in detach mode
 Key: FLINK-32881
 URL: https://issues.apache.org/jira/browse/FLINK-32881
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor, Client / Job Submission
Affects Versions: 1.19.0
Reporter: Renxiang Zhou
 Fix For: 1.19.0
 Attachments: image-2023-08-16-17-14-34-740.png, 
image-2023-08-16-17-14-44-212.png

When triggering a savepoint using the command-line tool, the client needs to 
wait for the job to finish creating the savepoint before it can exit. For jobs 
with large state, the savepoint creation process can be time-consuming, leading 
to the following problems:
 # Platform users may need to manage thousands of Flink tasks on a single 
client machine. With the current savepoint triggering mode, all savepoint 
creation threads on that machine have to wait for the job to finish the 
snapshot, resulting in significant resource waste;
 # If the savepoint producing time exceeds the client's timeout duration, the 
client will throw a timeout exception and report that the trggering savepoint 
process fails. Since different jobs have varying savepoint durations, it is 
difficult to adjust the client's timeout parameter.

Therefore, we propose adding a detach mode to trigger savepoints on the client 
side, just similar to the detach mode behavior when submitting jobs. Here are 
some specific details:
 # The savepoint UUID will be generated on the client side. After successfully 
triggering the savepoint, the client immediately returns the UUID information.
 # Add a "dump-pending-savepoints" API interface that allows the client to 
check whether the triggered savepoint has been successfully created.

By implementing these changes, the client can detach from the savepoint 
creation process, reducing resource waste, and providing a way to check the 
status of savepoint creation.

!image-2023-08-16-17-14-34-740.png!!image-2023-08-16-17-14-44-212.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] Updates to Flink's external connector CI workflows

2023-08-16 Thread Etienne Chauchot

This is great! Thanks for working on this.

Best

Etienne

Le 15/06/2023 à 13:19, Martijn Visser a écrit :

Hi all,

I would like to inform you of two changes that have been made to the shared
CI workflow that's used for Flink's externalized connectors.

1. Up until now, weekly builds were running to validate that connector code
(still) works with Flink. However, these builds were only running for code
on the "main" branch of the connector, and not for the branches of the
connector (like v3.0 for Elasticsearch, v1.0 for Opensearch etc). This was
tracked underhttps://issues.apache.org/jira/browse/FLINK-31923.

That issue has now been fixed, with the Github Action workflow now
accepting a map with arrays, which can contain a combination of Flink
versions to test for and the connector branch it should test. See
https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml#L28-L47
for an example on the Flink JDBC connector

This change has already been applied on the externalized connectors GCP
PubSub, RabbitMQ, JDBC, Pulsar, MongoDB, Opensearch, Cassandra,
Elasticsearch. AWS is pending the merging of the PR. For Kafka, Hive and
HBase, since they haven't finished externalization, this isn't applicable
to them yet.

2. When working on the debugging of a problem with the JDBC connector, one
of the things that was needed to debug that problem was the ability to see
the JVM thread dump. Withhttps://issues.apache.org/jira/browse/FLINK-32331
now completed, every failed CI run will have a JVM thread dump. You can see
the implementation for that in
https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml#L161-L195

Best regards,

Martijn


[jira] [Created] (FLINK-32882) FineGrainedSlotManager cause NPE when clear pending taskmanager twice

2023-08-16 Thread Weihua Hu (Jira)
Weihua Hu created FLINK-32882:
-

 Summary: FineGrainedSlotManager cause NPE when clear pending 
taskmanager twice
 Key: FLINK-32882
 URL: https://issues.apache.org/jira/browse/FLINK-32882
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Weihua Hu
Assignee: Weihua Hu
 Attachments: image-2023-08-16-17-34-32-619.png

When job finished we call 
processResourceRequirements(ResourceRequirements.empty) and 
clearResourceRequirements. Both methods trigger 
taskManagerTracker.clearPendingAllocationsOfJob(jobId) to release pending task 
manager early.
 
This causes NPE, we need to add a safety net for 
PendingTaskManager#clearPendingAllocationsOfJob



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-16 Thread Piotr Nowojski
Hi Dong,

Operators API is unfortunately also our public facing API and I mean the
APIs that we will add there should also be marked `@Experimental` IMO.

The config options should also be marked as experimental (both
annotated @Experimental and noted the same thing in the docs,
if @Experimental annotation is not automatically mentioned in the docs).

> Alternatively, how about we add a doc for
checkpointing.interval-during-backlog explaining its impact/concern as
discussed above?

We should do this independently from marking the APIs/config options as
`@Experimental`

Best,
Piotrek

pt., 11 sie 2023 o 14:55 Dong Lin  napisał(a):

> Hi Piotr,
>
> Thanks for the reply!
>
> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > Sorry for the long delay in responding!
> >
> > >  Given that it is an optional feature that can be
> > > turned off by users, it might be OK to just let users try it out and we
> > can
> > > fix performance issues once we detect any of them. What do you think?
> >
> > I think it's fine. It would be best to mark this feature as experimental,
> > and
> > we say that the config keys or the default values might change in the
> > future.
> >
>
> In general I agree we can mark APIs that determine "whether to enable
> dynamic switching between stream/batch mode" as experimental.
>
> However, I am not sure we have such an API yet. The APIs added in this FLIP
> are intended to be used by operator developers rather than end users. End
> users can enable this capability by setting
> execution.checkpointing.interval-during-backlog = Long.MAX and uses a
> source which might implicitly set backlog statu (e.g. HybridSource). So
> execution.checkpointing.interval-during-backlog is the only user-facing
> APIs that can always control whether this feature can be used.
>
> However, execution.checkpointing.interval-during-backlog itself is not tied
> to FLIP-327.
>
> Do you mean we should set checkpointing.interval-during-backlog as
> experimental? Alternatively, how about we add a doc for
> checkpointing.interval-during-backlog explaining its impact/concern as
> discussed above?
>
> Best,
> Dong
>
>
> > > Maybe we can revisit the need for such a config when we
> introduce/discuss
> > > the capability to switch backlog from false to true in the future. What
> > do
> > > you think?
> >
> > Sure, we can do that.
> >
> > Best,
> > Piotrek
> >
> > niedz., 23 lip 2023 o 14:32 Dong Lin  napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Thanks a lot for the explanation. Please see my reply inline.
> > >
> > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > piotr.nowoj...@gmail.com>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks a lot for the answers. I can now only briefly answer your last
> > > > email.
> > > >
> > > > > It is possible that spilling to disks might cause larger overhead.
> > IMO
> > > it
> > > > > is an orthogonal issue already existing in Flink. This is because a
> > > Flink
> > > > > job running batch mode might also be slower than its throughput in
> > > stream
> > > > > mode due to the same reason.
> > > >
> > > > Yes, I know, but the thing that worries me is that previously only a
> > user
> > > > alone
> > > > could decide whether to use batch mode or streaming, and in practice
> > one
> > > > user would rarely (if ever) use both for the same problem/job/query.
> If
> > > his
> > > > intention was to eventually process live data, he was using streaming
> > > even
> > > > if there was a large backlog at the start (apart of some very few
> very
> > > > power
> > > > users).
> > > >
> > > > With this change, we want to introduce a mode that would be switching
> > > back
> > > > and forth between streaming and "batch in streaming" automatically.
> So
> > a
> > > > potential performance regression would be much more visible and
> painful
> > > > at the same time. If batch query runs slower then it could, it's kind
> > of
> > > > fine as
> > > > it will end at some point. If streaming query during large back
> > pressure
> > > > maybe
> > > > temporary load spike switches to batch processing, that's a bigger
> > deal.
> > > > Especially if batch processing mode will not be able to actually even
> > > > handle
> > > > the normal load, after the load spike. In that case, the job could
> > never
> > > > recover
> > > > from the backpressure/backlog mode.
> > > >
> > >
> > > I understand you are concerned with the risk of performance regression
> > > introduced due to switching to batch mode.
> > >
> > > After thinking about this more, I think this existing proposal meets
> the
> > > minimum requirement of "not introducing regression for existing jobs".
> > The
> > > reason is that even if batch mode can be slower than stream mode for
> some
> > > operators in some cases, this is an optional feature that will only be
> > > enabled if a user explicitly overrides the newly introduced config to
> > > non-default values. Existing jobs that simply upgrade their Flink
> li

[DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-16 Thread Bonnie Arogyam Varghese
Platform providers may want to disable hints completely for security
reasons.

Currently, there is a configuration to disable OPTIONS hint -
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled

However, there is no configuration available to disable QUERY hints -
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints

The proposal is to add a new configuration:

Name: table.query-options.enabled
Description: Enable or disable the QUERY hint, if disabled, an
exception would be thrown if any QUERY hints are specified
Note: The default value will be set to true.


[jira] [Created] (FLINK-32883) Support for standby task managers

2023-08-16 Thread Tomoyuki NAKAMURA (Jira)
Tomoyuki NAKAMURA created FLINK-32883:
-

 Summary: Support for standby task managers
 Key: FLINK-32883
 URL: https://issues.apache.org/jira/browse/FLINK-32883
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0
Reporter: Tomoyuki NAKAMURA


[https://docs.ververica.com/user_guide/application_operations/deployments/scaling.html#run-with-standby-taskmanager]
I would like to be able to support standby task managers. Because on K8s, pods 
are often evicted or deleted due to node failure or autoscaling.

With the current implementation, it is not possible to set up a standby task 
manager, and jobs cannot run until all task managers are up and running. If a 
standby task manager could be supported, jobs could continue to run without 
downtime using the standby task manager, even if the task manager is 
unexpectedly deleted.

[https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L370-L380]
If the job manager's number of replicas is set, the job's parallelism setting 
is ignored, but it should be possible to support a standby task manager by 
automatically setting parallelism to the replicas*task slot only if the job's 
parallelism is not set (i.e. 0) and using that value if parallelism is set. 

If this change looks good, I will send a PR on GitHub.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs

2023-08-16 Thread liu ron
Hi, Jiangjie

Sorry for late reply. Thank you for such a detailed response. As you say,
there are three behaviours here for users and I agree with you. The goal of
this FLIP is to clarify the behaviour of the client side, which I also
agree with. However, as weihua said, the config execution.attached is not
only for per-job mode, but also for session mode, but the FLIP says that
this is only for per-job mode, and this config will be removed in the
future because the per-job mode has been deprecated. I don't think this is
correct and we should change the description in the corresponding section
of the FLIP. Since execution.attached is used in session mode, there is a
compatibility issue here if we change it directly to
client.attached.after.submission, and I think we should make this clear in
the FLIP.

Best,
Ron

Becket Qin  于2023年8月14日周一 20:33写道:

> Hi Ron and Weihua,
>
> Thanks for the feedback.
>
> There seem three user sensible behaviors that we are talking about:
>
> 1. The behavior on the client side, i.e. whether blocking until the job
> finishes or not.
>
> 2. The behavior of the submitted job, whether stop the job execution if the
> client is detached from the Flink cluster, i.e. whether bind the lifecycle
> of the job with the connection status of the attached client. For example,
> one might want to keep a batch job running until finish even after the
> client connection is lost. But it makes sense to stop the job upon client
> connection lost if the job invokes collect() on a streaming job.
>
> 3. The behavior of the Flink cluster (JM and TMs), whether shutdown the
> Flink cluster if the client is detached from the Flink cluster, i.e.
> whether bind the cluster lifecycle with the job lifecycle. For dedicated
> clusters (application cluster or dedicated session clusters), the lifecycle
> of the cluster should be bound with the job lifecycle. But for shared
> session clusters, the lifecycle of the Flink cluster should be independent
> of the jobs running in it.
>
> As we can see, these three behaviors are sort of independent, the current
> configurations fail to support all the combination of wanted behaviors.
> Ideally there should be three separate configurations, for example:
> - client.attached.after.submission and client.heartbeat.timeout control the
> behavior on the client side.
> - jobmanager.cancel-on-attached-client-exit controls the behavior of the
> job when an attached client lost connection. The client heartbeat timeout
> and attach-ness will be also passed to the JM upon job submission.
> - cluster.shutdown-on-first-job-finishes *(*or
> jobmanager.shutdown-cluster-after-job-finishes) controls the cluster
> behavior after the job finishes normally / abnormally. This is a cluster
> level setting instead of a job level setting. Therefore it can only be set
> when launching the cluster.
>
> The current code sort of combines config 2 and 3 into
> execution.shutdown-on-attach-exit.
> This assumes the the life cycle of the cluster is the same as the job when
> the client is attached. This FLIP does not intend to change that. but using
> the execution.attached config for the client behavior control looks
> misleading. So this FLIP proposes to replace it with a more intuitive
> config of client.attached.after.submission. This makes it clear that it is
> a configuration controlling the client side behavior, instead of the
> execution of the job.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Thu, Aug 10, 2023 at 10:34 PM Weihua Hu  wrote:
>
> > Hi Allison
> >
> > Thanks for driving this FLIP. It's a valuable feature for batch jobs.
> > This helps keep "Drop Per-Job Mode [1]" going.
> >
> > +1 for this proposal.
> >
> > However, it seems that the change in this FLIP is not detailed enough.
> > I have a few questions.
> >
> > 1. The config 'execution.attached' is not only used in per-job mode,
> > but also in session mode to shutdown the cluster. IMHO, it's better to
> > keep this option name.
> >
> > 2. This FLIP only mentions YARN mode. I believe this feature should
> > work in both YARN and Kubernetes mode.
> >
> > 3. Within the attach mode, we support two features:
> > execution.shutdown-on-attached-exit
> > and client.heartbeat.timeout. These should also be taken into account.
> >
> > 4. The Application Mode will shut down once the job has been completed.
> > So, if we use the flink client to poll job status via REST API for attach
> > mode,
> > there is a chance that the client will not be able to retrieve the job
> > finish status.
> > Perhaps FLINK-24113[3] will help with this.
> >
> >
> > [1]https://issues.apache.org/jira/browse/FLINK-26000
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
> > [2]https://issues.apache.org/jira/browse/FLINK-24113
> >
> > Best,
> > Weihua
> >
> >
> > On Thu, Aug 10, 2023 at 10:47 AM liu ron  wrote:
> >
> > > Hi, Allison
> > >
> > > Thanks for driving this propo

Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-16 Thread liu ron
Hi,

Thanks for driving this proposal.

Can you explain why you would need to disable query hints because of
security issues? I don't really understand why query hints affects security.

Best,
Ron

Bonnie Arogyam Varghese  于2023年8月16日周三
23:59写道:

> Platform providers may want to disable hints completely for security
> reasons.
>
> Currently, there is a configuration to disable OPTIONS hint -
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled
>
> However, there is no configuration available to disable QUERY hints -
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints
>
> The proposal is to add a new configuration:
>
> Name: table.query-options.enabled
> Description: Enable or disable the QUERY hint, if disabled, an
> exception would be thrown if any QUERY hints are specified
> Note: The default value will be set to true.
>


Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-16 Thread Bonnie Arogyam Varghese
Hi Liu,
 Options hints could be a security concern since users can override
settings. However, query hints specifically could affect performance.
Since we have a config to disable Options hint, I'm suggesting we also have
a config to disable Query hints.

On Wed, Aug 16, 2023 at 9:41 AM liu ron  wrote:

> Hi,
>
> Thanks for driving this proposal.
>
> Can you explain why you would need to disable query hints because of
> security issues? I don't really understand why query hints affects
> security.
>
> Best,
> Ron
>
> Bonnie Arogyam Varghese  于2023年8月16日周三
> 23:59写道:
>
> > Platform providers may want to disable hints completely for security
> > reasons.
> >
> > Currently, there is a configuration to disable OPTIONS hint -
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled
> >
> > However, there is no configuration available to disable QUERY hints -
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints
> >
> > The proposal is to add a new configuration:
> >
> > Name: table.query-options.enabled
> > Description: Enable or disable the QUERY hint, if disabled, an
> > exception would be thrown if any QUERY hints are specified
> > Note: The default value will be set to true.
> >
>


Re: FLINK-20767 - Support for nested fields filter push down

2023-08-16 Thread Venkatakrishnan Sowrirajan
Btw, this is the FLIP proposal discussion thread. Please share your
thoughts. Thanks.

Regards
Venkata krishnan


On Sun, Aug 13, 2023 at 6:35 AM liu ron  wrote:

> Hi, Venkata krishnan
>
> Thanks for driving this work, look forward to your FLIP.
>
> Best,
> Ron
>
> Venkatakrishnan Sowrirajan  于2023年8月13日周日 14:34写道:
>
> > Thanks Yunhong. That's correct. I am able to make it work locally.
> > Currently, in the process of writing a FLIP for the necessary changes to
> > the SupportsFilterPushDown API to support nested fields filter push down.
> >
> > Regards
> > Venkata krishnan
> >
> >
> > On Mon, Aug 7, 2023 at 8:28 PM yh z  wrote:
> >
> > > Hi Venkatakrishnan,
> > > Sorry for the late reply. I have looked at the code and feel like you
> > need
> > > to modify the logic of the
> > > ExpressionConverter.visit(FieldReferenceExpression expression) method
> to
> > > support nested types,
> > > which are not currently supported in currently code.
> > >
> > > Regards,
> > > Yunhong Zheng (Swuferhong)
> > >
> > > Venkatakrishnan Sowrirajan  于2023年8月7日周一 13:30写道:
> > >
> > > > (Sorry, I pressed send too early)
> > > >
> > > > Thanks for the help @zhengyunhon...@gmail.com.
> > > >
> > > > Agree on not changing the API as much as possible as well as wrt
> > > > simplifying Projection pushdown with nested fields eventually as
> well.
> > > >
> > > > In terms of the code itself, currently I am trying to leverage the
> > > > FieldReferenceExpression to also handle nested fields for filter push
> > > down.
> > > > But where I am currently struggling to make progress is, once the
> > filters
> > > > are pushed to the table source itself, in
> > > >
> > PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable
> > > > there is a conversion from List > > > FieldReferenceExpression) to the List itself.
> > > >
> > > > If you have some pointers for that, please let me know. Thanks.
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan <
> > > > vsowr...@asu.edu>
> > > > wrote:
> > > >
> > > > > Thanks @zhengyunhon...@gmail.com
> > > > > Regards
> > > > > Venkata krishnan
> > > > >
> > > > >
> > > > > On Sun, Aug 6, 2023 at 6:16 PM yh z 
> > wrote:
> > > > >
> > > > >> Hi, Venkatakrishnan,
> > > > >> I think this is a very useful feature. I have been focusing on the
> > > > >> development of the flink-table-planner module recently, so if you
> > need
> > > > >> some
> > > > >> help, I can assist you in completing the development of some
> > sub-tasks
> > > > or
> > > > >> code review.
> > > > >>
> > > > >> Returning to the design itself, I think it's necessary to modify
> > > > >> FieldReferenceExpression or re-implement a
> > > > NestedFieldReferenceExpression.
> > > > >> As for modifying the interface of SupportsProjectionPushDown, I
> > think
> > > we
> > > > >> need to make some trade-offs. As a connector developer, the
> > stability
> > > of
> > > > >> the interface is very important. If there are no unresolved bugs,
> I
> > > > >> personally do not recommend modifying the interface. However,
> when I
> > > > first
> > > > >> read the code of SupportsProjectionPushDown, the design of int[][]
> > was
> > > > >> very
> > > > >> confusing for me, and it took me a long time to understand it by
> > > running
> > > > >> specify UT tests. Therefore, in terms of the design of this
> > interface
> > > > and
> > > > >> the consistency between different interfaces, there is indeed room
> > for
> > > > >> improvement it.
> > > > >>
> > > > >> Thanks,
> > > > >> Yunhong Zheng (Swuferhong)
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Becket Qin  于2023年8月3日周四 07:44写道:
> > > > >>
> > > > >> > Hi Jark,
> > > > >> >
> > > > >> > If the FieldReferenceExpression contains an int[] to support a
> > > nested
> > > > >> field
> > > > >> > reference, List (or
> > > > >> FieldReferenceExpression[])
> > > > >> > and int[][] are actually equivalent. If we are designing this
> from
> > > > >> scratch,
> > > > >> > personally I prefer using List for
> > > > >> consistency,
> > > > >> > i.e. always resolving everything to expressions for users.
> > > Projection
> > > > >> is a
> > > > >> > simpler case, but should not be a special case. This avoids
> doing
> > > the
> > > > >> same
> > > > >> > thing in different ways which is also a confusion to the users.
> To
> > > me,
> > > > >> the
> > > > >> > int[][] format would become kind of a technical debt after we
> > extend
> > > > the
> > > > >> > FieldReferenceExpression. Although we don't have to address it
> > right
> > > > >> away
> > > > >> > in the same FLIP, this kind of debt accumulates over time and
> > makes
> > > > the
> > > > >> > project harder to learn and maintain. So, personally I prefer to
> > > > address
> > > > >> > these technical debts as soon as possible.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Jiangjie (Becket) Qin
> > > > >> >
> > > > >> > On Wed, Aug 2, 2023 

Re: FLINK-20767 - Support for nested fields filter push down

2023-08-16 Thread Venkatakrishnan Sowrirajan
Forgot to share the link -
https://lists.apache.org/thread/686bhgwrrb4xmbfzlk60szwxos4z64t7 in the
last email.

Regards
Venkata krishnan


On Wed, Aug 16, 2023 at 11:55 AM Venkatakrishnan Sowrirajan <
vsowr...@asu.edu> wrote:

> Btw, this is the FLIP proposal discussion thread. Please share your
> thoughts. Thanks.
>
> Regards
> Venkata krishnan
>
>
> On Sun, Aug 13, 2023 at 6:35 AM liu ron  wrote:
>
>> Hi, Venkata krishnan
>>
>> Thanks for driving this work, look forward to your FLIP.
>>
>> Best,
>> Ron
>>
>> Venkatakrishnan Sowrirajan  于2023年8月13日周日 14:34写道:
>>
>> > Thanks Yunhong. That's correct. I am able to make it work locally.
>> > Currently, in the process of writing a FLIP for the necessary changes to
>> > the SupportsFilterPushDown API to support nested fields filter push
>> down.
>> >
>> > Regards
>> > Venkata krishnan
>> >
>> >
>> > On Mon, Aug 7, 2023 at 8:28 PM yh z  wrote:
>> >
>> > > Hi Venkatakrishnan,
>> > > Sorry for the late reply. I have looked at the code and feel like you
>> > need
>> > > to modify the logic of the
>> > > ExpressionConverter.visit(FieldReferenceExpression expression) method
>> to
>> > > support nested types,
>> > > which are not currently supported in currently code.
>> > >
>> > > Regards,
>> > > Yunhong Zheng (Swuferhong)
>> > >
>> > > Venkatakrishnan Sowrirajan  于2023年8月7日周一 13:30写道:
>> > >
>> > > > (Sorry, I pressed send too early)
>> > > >
>> > > > Thanks for the help @zhengyunhon...@gmail.com.
>> > > >
>> > > > Agree on not changing the API as much as possible as well as wrt
>> > > > simplifying Projection pushdown with nested fields eventually as
>> well.
>> > > >
>> > > > In terms of the code itself, currently I am trying to leverage the
>> > > > FieldReferenceExpression to also handle nested fields for filter
>> push
>> > > down.
>> > > > But where I am currently struggling to make progress is, once the
>> > filters
>> > > > are pushed to the table source itself, in
>> > > >
>> > PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable
>> > > > there is a conversion from List> > > > FieldReferenceExpression) to the List itself.
>> > > >
>> > > > If you have some pointers for that, please let me know. Thanks.
>> > > >
>> > > > Regards
>> > > > Venkata krishnan
>> > > >
>> > > >
>> > > > On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan <
>> > > > vsowr...@asu.edu>
>> > > > wrote:
>> > > >
>> > > > > Thanks @zhengyunhon...@gmail.com
>> > > > > Regards
>> > > > > Venkata krishnan
>> > > > >
>> > > > >
>> > > > > On Sun, Aug 6, 2023 at 6:16 PM yh z 
>> > wrote:
>> > > > >
>> > > > >> Hi, Venkatakrishnan,
>> > > > >> I think this is a very useful feature. I have been focusing on
>> the
>> > > > >> development of the flink-table-planner module recently, so if you
>> > need
>> > > > >> some
>> > > > >> help, I can assist you in completing the development of some
>> > sub-tasks
>> > > > or
>> > > > >> code review.
>> > > > >>
>> > > > >> Returning to the design itself, I think it's necessary to modify
>> > > > >> FieldReferenceExpression or re-implement a
>> > > > NestedFieldReferenceExpression.
>> > > > >> As for modifying the interface of SupportsProjectionPushDown, I
>> > think
>> > > we
>> > > > >> need to make some trade-offs. As a connector developer, the
>> > stability
>> > > of
>> > > > >> the interface is very important. If there are no unresolved
>> bugs, I
>> > > > >> personally do not recommend modifying the interface. However,
>> when I
>> > > > first
>> > > > >> read the code of SupportsProjectionPushDown, the design of
>> int[][]
>> > was
>> > > > >> very
>> > > > >> confusing for me, and it took me a long time to understand it by
>> > > running
>> > > > >> specify UT tests. Therefore, in terms of the design of this
>> > interface
>> > > > and
>> > > > >> the consistency between different interfaces, there is indeed
>> room
>> > for
>> > > > >> improvement it.
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Yunhong Zheng (Swuferhong)
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> Becket Qin  于2023年8月3日周四 07:44写道:
>> > > > >>
>> > > > >> > Hi Jark,
>> > > > >> >
>> > > > >> > If the FieldReferenceExpression contains an int[] to support a
>> > > nested
>> > > > >> field
>> > > > >> > reference, List (or
>> > > > >> FieldReferenceExpression[])
>> > > > >> > and int[][] are actually equivalent. If we are designing this
>> from
>> > > > >> scratch,
>> > > > >> > personally I prefer using List for
>> > > > >> consistency,
>> > > > >> > i.e. always resolving everything to expressions for users.
>> > > Projection
>> > > > >> is a
>> > > > >> > simpler case, but should not be a special case. This avoids
>> doing
>> > > the
>> > > > >> same
>> > > > >> > thing in different ways which is also a confusion to the
>> users. To
>> > > me,
>> > > > >> the
>> > > > >> > int[][] format would become kind of a technical debt after we
>> > extend
>> > > > the
>> > > > >> > FieldReferenceExpression. Although we don't have to address it
>> >

[jira] [Created] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme

2023-08-16 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32884:
---

 Summary: PyFlink remote execution should support URLs with paths 
and https scheme
 Key: FLINK-32884
 URL: https://issues.apache.org/jira/browse/FLINK-32884
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Runtime / REST
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov


Currently, the `SUBMIT_ARGS=remote -m http://:` format. For 
local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081`, 
but it does not support the placement of the JobManager befind a proxy or using 
an Ingress for routing to a specific Flink cluster based on the URL path. In 
current scenario, it expects JobManager access PyFlink jobs at 
`http://:/v1/jobs` endpoint. Mapping to a non-root location, 
`https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs`
 is not supported.

Since RestClusterClient talks to the JobManager via its REST endpoint, the 
right format for `SUBMIT_ARGS` is URL with path (also support for https scheme).

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
flink-core/src/main/java/org/apache/flink/util/NetUtils.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution

2023-08-16 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-32885:
---

 Summary: Refactoring: Moving UrlPrefixDecorator into flink-clients 
so it can be used by RestClusterClient for PyFlink remote execution
 Key: FLINK-32885
 URL: https://issues.apache.org/jira/browse/FLINK-32885
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Table SQL / Gateway
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov


UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has 
dependency on `flink-clients` module. RestClusterClient will also need to use 
UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor 
related classes to achieve this.

I intend to change these classes in a backward compatible way

flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java
flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32886) Issue with volumeMounts when creating OLM for Flink Operator 1.6.0

2023-08-16 Thread James Busche (Jira)
James Busche created FLINK-32886:


 Summary: Issue with volumeMounts when creating OLM for Flink 
Operator 1.6.0
 Key: FLINK-32886
 URL: https://issues.apache.org/jira/browse/FLINK-32886
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0
Reporter: James Busche


I notice a volumemount problem when trying to deploy the OLM CSV for the 1.6.0 
Flink Kubernetes Operator.  (Following the directions from [OLM Verification of 
a Flink Kubernetes Operator 
Release|https://cwiki.apache.org/confluence/display/FLINK/OLM+Verification+of+a+Flink+Kubernetes+Operator+Release]]

 

^{{oc describe csv}}^

^{{...}}^

^{{Warning  InstallComponentFailed  46s (x7 over 49s)  
operator-lifecycle-manager  install strategy failed: Deployment.apps 
"flink-kubernetes-operator" is invalid: [spec.template.spec.volumes[2].name: 
Duplicate value: "keystore", 
spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
"flink-artifacts-volume"]}}^

 

My current workaround is to change [line 
88|https://github.com/apache/flink-kubernetes-operator/blob/main/tools/olm/docker-entry.sh#L88]
 to look like this:

 

{{  ^yq ea -i '.spec.install.spec.deployments[0].spec.template.spec.volumes[1] 
= \{"name": "flink-artifacts-volume","emptyDir": {}}' "${CSV_FILE}"^}}  ^{{yq 
ea -i '.spec.install.spec.deployments[0].spec.template.spec.volumes[2] = 
\{"name": "keystore","emptyDir": {}}' "${CSV_FILE}"}}^

 

And then the operator deploys without error:

^oc get csv                                                                     
                                                                     NAME       
                        DISPLAY                     VERSION   REPLACES          
                 PHASEflink-kubernetes-operator.v1.6.0   Flink Kubernetes 
Operator   1.6.0     flink-kubernetes-operator.v1.5.0   Succeeded^



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs

2023-08-16 Thread Becket Qin
Hi Ron,

Isn't the cluster (session or per job) only using the execution.attached to
determine whether the client is attached? If so, the client can always
include the information of whether it's an attached client or not in the
JobSubmissoinRequestBody, right? For a shared session cluster, there could
be multiple clients submitting jobs to it. These clients may or may not be
attached. A static execution.attached configuration for the session cluster
does not work in this case, right?

The current problem of execution.attached is that it is not always honored.
For example, if a session cluster was started with execution.attached set
to false. And a client submits a job later to that session cluster with
execution.attached set to true. In this case, the cluster won't (and
shouldn't) shutdown after the job finishes or the attached client loses
connection. So, in fact, the execution.attached configuration is only
honored by the client, but not the cluster. Therefore, I think removing it
makes sense.

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 17, 2023 at 12:31 AM liu ron  wrote:

> Hi, Jiangjie
>
> Sorry for late reply. Thank you for such a detailed response. As you say,
> there are three behaviours here for users and I agree with you. The goal of
> this FLIP is to clarify the behaviour of the client side, which I also
> agree with. However, as weihua said, the config execution.attached is not
> only for per-job mode, but also for session mode, but the FLIP says that
> this is only for per-job mode, and this config will be removed in the
> future because the per-job mode has been deprecated. I don't think this is
> correct and we should change the description in the corresponding section
> of the FLIP. Since execution.attached is used in session mode, there is a
> compatibility issue here if we change it directly to
> client.attached.after.submission, and I think we should make this clear in
> the FLIP.
>
> Best,
> Ron
>
> Becket Qin  于2023年8月14日周一 20:33写道:
>
> > Hi Ron and Weihua,
> >
> > Thanks for the feedback.
> >
> > There seem three user sensible behaviors that we are talking about:
> >
> > 1. The behavior on the client side, i.e. whether blocking until the job
> > finishes or not.
> >
> > 2. The behavior of the submitted job, whether stop the job execution if
> the
> > client is detached from the Flink cluster, i.e. whether bind the
> lifecycle
> > of the job with the connection status of the attached client. For
> example,
> > one might want to keep a batch job running until finish even after the
> > client connection is lost. But it makes sense to stop the job upon client
> > connection lost if the job invokes collect() on a streaming job.
> >
> > 3. The behavior of the Flink cluster (JM and TMs), whether shutdown the
> > Flink cluster if the client is detached from the Flink cluster, i.e.
> > whether bind the cluster lifecycle with the job lifecycle. For dedicated
> > clusters (application cluster or dedicated session clusters), the
> lifecycle
> > of the cluster should be bound with the job lifecycle. But for shared
> > session clusters, the lifecycle of the Flink cluster should be
> independent
> > of the jobs running in it.
> >
> > As we can see, these three behaviors are sort of independent, the current
> > configurations fail to support all the combination of wanted behaviors.
> > Ideally there should be three separate configurations, for example:
> > - client.attached.after.submission and client.heartbeat.timeout control
> the
> > behavior on the client side.
> > - jobmanager.cancel-on-attached-client-exit controls the behavior of the
> > job when an attached client lost connection. The client heartbeat timeout
> > and attach-ness will be also passed to the JM upon job submission.
> > - cluster.shutdown-on-first-job-finishes *(*or
> > jobmanager.shutdown-cluster-after-job-finishes) controls the cluster
> > behavior after the job finishes normally / abnormally. This is a cluster
> > level setting instead of a job level setting. Therefore it can only be
> set
> > when launching the cluster.
> >
> > The current code sort of combines config 2 and 3 into
> > execution.shutdown-on-attach-exit.
> > This assumes the the life cycle of the cluster is the same as the job
> when
> > the client is attached. This FLIP does not intend to change that. but
> using
> > the execution.attached config for the client behavior control looks
> > misleading. So this FLIP proposes to replace it with a more intuitive
> > config of client.attached.after.submission. This makes it clear that it
> is
> > a configuration controlling the client side behavior, instead of the
> > execution of the job.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> > On Thu, Aug 10, 2023 at 10:34 PM Weihua Hu 
> wrote:
> >
> > > Hi Allison
> > >
> > > Thanks for driving this FLIP. It's a valuable feature for batch jobs.
> > > This helps keep "Drop Per-Job Mode [1]" going.
> > >
> > > +1 for this proposal.
> > >
> > > Howeve

Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs

2023-08-16 Thread liu ron
Hi, Jiangjie

Thanks for your detailed explanation, I got your point. If the
execution.attached is only used for client currently, removing it also make
sense to me.

Best,
Ron

Becket Qin  于2023年8月17日周四 07:37写道:

> Hi Ron,
>
> Isn't the cluster (session or per job) only using the execution.attached to
> determine whether the client is attached? If so, the client can always
> include the information of whether it's an attached client or not in the
> JobSubmissoinRequestBody, right? For a shared session cluster, there could
> be multiple clients submitting jobs to it. These clients may or may not be
> attached. A static execution.attached configuration for the session cluster
> does not work in this case, right?
>
> The current problem of execution.attached is that it is not always honored.
> For example, if a session cluster was started with execution.attached set
> to false. And a client submits a job later to that session cluster with
> execution.attached set to true. In this case, the cluster won't (and
> shouldn't) shutdown after the job finishes or the attached client loses
> connection. So, in fact, the execution.attached configuration is only
> honored by the client, but not the cluster. Therefore, I think removing it
> makes sense.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Aug 17, 2023 at 12:31 AM liu ron  wrote:
>
> > Hi, Jiangjie
> >
> > Sorry for late reply. Thank you for such a detailed response. As you say,
> > there are three behaviours here for users and I agree with you. The goal
> of
> > this FLIP is to clarify the behaviour of the client side, which I also
> > agree with. However, as weihua said, the config execution.attached is not
> > only for per-job mode, but also for session mode, but the FLIP says that
> > this is only for per-job mode, and this config will be removed in the
> > future because the per-job mode has been deprecated. I don't think this
> is
> > correct and we should change the description in the corresponding section
> > of the FLIP. Since execution.attached is used in session mode, there is a
> > compatibility issue here if we change it directly to
> > client.attached.after.submission, and I think we should make this clear
> in
> > the FLIP.
> >
> > Best,
> > Ron
> >
> > Becket Qin  于2023年8月14日周一 20:33写道:
> >
> > > Hi Ron and Weihua,
> > >
> > > Thanks for the feedback.
> > >
> > > There seem three user sensible behaviors that we are talking about:
> > >
> > > 1. The behavior on the client side, i.e. whether blocking until the job
> > > finishes or not.
> > >
> > > 2. The behavior of the submitted job, whether stop the job execution if
> > the
> > > client is detached from the Flink cluster, i.e. whether bind the
> > lifecycle
> > > of the job with the connection status of the attached client. For
> > example,
> > > one might want to keep a batch job running until finish even after the
> > > client connection is lost. But it makes sense to stop the job upon
> client
> > > connection lost if the job invokes collect() on a streaming job.
> > >
> > > 3. The behavior of the Flink cluster (JM and TMs), whether shutdown the
> > > Flink cluster if the client is detached from the Flink cluster, i.e.
> > > whether bind the cluster lifecycle with the job lifecycle. For
> dedicated
> > > clusters (application cluster or dedicated session clusters), the
> > lifecycle
> > > of the cluster should be bound with the job lifecycle. But for shared
> > > session clusters, the lifecycle of the Flink cluster should be
> > independent
> > > of the jobs running in it.
> > >
> > > As we can see, these three behaviors are sort of independent, the
> current
> > > configurations fail to support all the combination of wanted behaviors.
> > > Ideally there should be three separate configurations, for example:
> > > - client.attached.after.submission and client.heartbeat.timeout control
> > the
> > > behavior on the client side.
> > > - jobmanager.cancel-on-attached-client-exit controls the behavior of
> the
> > > job when an attached client lost connection. The client heartbeat
> timeout
> > > and attach-ness will be also passed to the JM upon job submission.
> > > - cluster.shutdown-on-first-job-finishes *(*or
> > > jobmanager.shutdown-cluster-after-job-finishes) controls the cluster
> > > behavior after the job finishes normally / abnormally. This is a
> cluster
> > > level setting instead of a job level setting. Therefore it can only be
> > set
> > > when launching the cluster.
> > >
> > > The current code sort of combines config 2 and 3 into
> > > execution.shutdown-on-attach-exit.
> > > This assumes the the life cycle of the cluster is the same as the job
> > when
> > > the client is attached. This FLIP does not intend to change that. but
> > using
> > > the execution.attached config for the client behavior control looks
> > > misleading. So this FLIP proposes to replace it with a more intuitive
> > > config of client.attached.after.submission. This makes it clear that it
> > is
> > 

[jira] [Created] (FLINK-32887) SourceCoordinatorContext#workerExecutor may cause task initializing slowly

2023-08-16 Thread liang jie (Jira)
liang jie created FLINK-32887:
-

 Summary: SourceCoordinatorContext#workerExecutor may cause task 
initializing slowly 
 Key: FLINK-32887
 URL: https://issues.apache.org/jira/browse/FLINK-32887
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common, Runtime / Coordination
Affects Versions: 1.15.2
Reporter: liang jie


SourceCoordinatorContext#workerExecutor is typically used to calculate 
partitions of a source task and is implemented by a ScheduledExecutorService 
with only 1 core (hard coded).Tasks to calculate partitions with be executed 
through the function 'workerExecutor.scheduleAtFixedRate'.
--- 
In some case, for example, 'getSubscribedPartitions' method will take quite a 
long time(e.g. 5min) because of lots of topics are included in the same task or 
requests to outer systems timeout etc. And partitionDiscoveryInterval is set to 
a short intervel e.g. 1min.
In this case, 'getSubscribedPartitions' runnable tasks will be triggered 
repeatedly and be queued in the queue of workerExecutor, during the first 
'getSubscribedPartitions' task running duration, which causing 
'checkPartitionChanges' tasks will be queued too. Each 'checkPartitionChanges' 
task needs to wait for 25mins(5 * 'getSubscribedPartitions' task execution 
duration) before it was executed.
---
In my view, tasks of workerExecutor should be scheduled with fix deley rather 
than at fixed rate. Because there is no meaning that 'getSubscribedPartitions' 
tasks being repeatedly executed without a 'checkPartitionChanges' execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)