[jira] [Created] (FLINK-32880) Redundant taskManager should be replenished in FineGrainedSlotManager
xiangyu feng created FLINK-32880: Summary: Redundant taskManager should be replenished in FineGrainedSlotManager Key: FLINK-32880 URL: https://issues.apache.org/jira/browse/FLINK-32880 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: xiangyu feng Currently, if you are using FineGrainedSlotManager, when a redundant taskmanager exit abnormally, no extra taskmanager will be replenished. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32881) Client supports making savepoints in detach mode
Renxiang Zhou created FLINK-32881: - Summary: Client supports making savepoints in detach mode Key: FLINK-32881 URL: https://issues.apache.org/jira/browse/FLINK-32881 Project: Flink Issue Type: Improvement Components: API / State Processor, Client / Job Submission Affects Versions: 1.19.0 Reporter: Renxiang Zhou Fix For: 1.19.0 Attachments: image-2023-08-16-17-14-34-740.png, image-2023-08-16-17-14-44-212.png When triggering a savepoint using the command-line tool, the client needs to wait for the job to finish creating the savepoint before it can exit. For jobs with large state, the savepoint creation process can be time-consuming, leading to the following problems: # Platform users may need to manage thousands of Flink tasks on a single client machine. With the current savepoint triggering mode, all savepoint creation threads on that machine have to wait for the job to finish the snapshot, resulting in significant resource waste; # If the savepoint producing time exceeds the client's timeout duration, the client will throw a timeout exception and report that the trggering savepoint process fails. Since different jobs have varying savepoint durations, it is difficult to adjust the client's timeout parameter. Therefore, we propose adding a detach mode to trigger savepoints on the client side, just similar to the detach mode behavior when submitting jobs. Here are some specific details: # The savepoint UUID will be generated on the client side. After successfully triggering the savepoint, the client immediately returns the UUID information. # Add a "dump-pending-savepoints" API interface that allows the client to check whether the triggered savepoint has been successfully created. By implementing these changes, the client can detach from the savepoint creation process, reducing resource waste, and providing a way to check the status of savepoint creation. !image-2023-08-16-17-14-34-740.png!!image-2023-08-16-17-14-44-212.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [ANNOUNCE] Updates to Flink's external connector CI workflows
This is great! Thanks for working on this. Best Etienne Le 15/06/2023 à 13:19, Martijn Visser a écrit : Hi all, I would like to inform you of two changes that have been made to the shared CI workflow that's used for Flink's externalized connectors. 1. Up until now, weekly builds were running to validate that connector code (still) works with Flink. However, these builds were only running for code on the "main" branch of the connector, and not for the branches of the connector (like v3.0 for Elasticsearch, v1.0 for Opensearch etc). This was tracked underhttps://issues.apache.org/jira/browse/FLINK-31923. That issue has now been fixed, with the Github Action workflow now accepting a map with arrays, which can contain a combination of Flink versions to test for and the connector branch it should test. See https://github.com/apache/flink-connector-jdbc/blob/main/.github/workflows/weekly.yml#L28-L47 for an example on the Flink JDBC connector This change has already been applied on the externalized connectors GCP PubSub, RabbitMQ, JDBC, Pulsar, MongoDB, Opensearch, Cassandra, Elasticsearch. AWS is pending the merging of the PR. For Kafka, Hive and HBase, since they haven't finished externalization, this isn't applicable to them yet. 2. When working on the debugging of a problem with the JDBC connector, one of the things that was needed to debug that problem was the ability to see the JVM thread dump. Withhttps://issues.apache.org/jira/browse/FLINK-32331 now completed, every failed CI run will have a JVM thread dump. You can see the implementation for that in https://github.com/apache/flink-connector-shared-utils/blob/ci_utils/.github/workflows/ci.yml#L161-L195 Best regards, Martijn
[jira] [Created] (FLINK-32882) FineGrainedSlotManager cause NPE when clear pending taskmanager twice
Weihua Hu created FLINK-32882: - Summary: FineGrainedSlotManager cause NPE when clear pending taskmanager twice Key: FLINK-32882 URL: https://issues.apache.org/jira/browse/FLINK-32882 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.0 Reporter: Weihua Hu Assignee: Weihua Hu Attachments: image-2023-08-16-17-34-32-619.png When job finished we call processResourceRequirements(ResourceRequirements.empty) and clearResourceRequirements. Both methods trigger taskManagerTracker.clearPendingAllocationsOfJob(jobId) to release pending task manager early. This causes NPE, we need to add a safety net for PendingTaskManager#clearPendingAllocationsOfJob -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data
Hi Dong, Operators API is unfortunately also our public facing API and I mean the APIs that we will add there should also be marked `@Experimental` IMO. The config options should also be marked as experimental (both annotated @Experimental and noted the same thing in the docs, if @Experimental annotation is not automatically mentioned in the docs). > Alternatively, how about we add a doc for checkpointing.interval-during-backlog explaining its impact/concern as discussed above? We should do this independently from marking the APIs/config options as `@Experimental` Best, Piotrek pt., 11 sie 2023 o 14:55 Dong Lin napisał(a): > Hi Piotr, > > Thanks for the reply! > > On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski > wrote: > > > Hi, > > > > Sorry for the long delay in responding! > > > > > Given that it is an optional feature that can be > > > turned off by users, it might be OK to just let users try it out and we > > can > > > fix performance issues once we detect any of them. What do you think? > > > > I think it's fine. It would be best to mark this feature as experimental, > > and > > we say that the config keys or the default values might change in the > > future. > > > > In general I agree we can mark APIs that determine "whether to enable > dynamic switching between stream/batch mode" as experimental. > > However, I am not sure we have such an API yet. The APIs added in this FLIP > are intended to be used by operator developers rather than end users. End > users can enable this capability by setting > execution.checkpointing.interval-during-backlog = Long.MAX and uses a > source which might implicitly set backlog statu (e.g. HybridSource). So > execution.checkpointing.interval-during-backlog is the only user-facing > APIs that can always control whether this feature can be used. > > However, execution.checkpointing.interval-during-backlog itself is not tied > to FLIP-327. > > Do you mean we should set checkpointing.interval-during-backlog as > experimental? Alternatively, how about we add a doc for > checkpointing.interval-during-backlog explaining its impact/concern as > discussed above? > > Best, > Dong > > > > > Maybe we can revisit the need for such a config when we > introduce/discuss > > > the capability to switch backlog from false to true in the future. What > > do > > > you think? > > > > Sure, we can do that. > > > > Best, > > Piotrek > > > > niedz., 23 lip 2023 o 14:32 Dong Lin napisał(a): > > > > > Hi Piotr, > > > > > > Thanks a lot for the explanation. Please see my reply inline. > > > > > > On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski < > > piotr.nowoj...@gmail.com> > > > wrote: > > > > > > > Hi Dong, > > > > > > > > Thanks a lot for the answers. I can now only briefly answer your last > > > > email. > > > > > > > > > It is possible that spilling to disks might cause larger overhead. > > IMO > > > it > > > > > is an orthogonal issue already existing in Flink. This is because a > > > Flink > > > > > job running batch mode might also be slower than its throughput in > > > stream > > > > > mode due to the same reason. > > > > > > > > Yes, I know, but the thing that worries me is that previously only a > > user > > > > alone > > > > could decide whether to use batch mode or streaming, and in practice > > one > > > > user would rarely (if ever) use both for the same problem/job/query. > If > > > his > > > > intention was to eventually process live data, he was using streaming > > > even > > > > if there was a large backlog at the start (apart of some very few > very > > > > power > > > > users). > > > > > > > > With this change, we want to introduce a mode that would be switching > > > back > > > > and forth between streaming and "batch in streaming" automatically. > So > > a > > > > potential performance regression would be much more visible and > painful > > > > at the same time. If batch query runs slower then it could, it's kind > > of > > > > fine as > > > > it will end at some point. If streaming query during large back > > pressure > > > > maybe > > > > temporary load spike switches to batch processing, that's a bigger > > deal. > > > > Especially if batch processing mode will not be able to actually even > > > > handle > > > > the normal load, after the load spike. In that case, the job could > > never > > > > recover > > > > from the backpressure/backlog mode. > > > > > > > > > > I understand you are concerned with the risk of performance regression > > > introduced due to switching to batch mode. > > > > > > After thinking about this more, I think this existing proposal meets > the > > > minimum requirement of "not introducing regression for existing jobs". > > The > > > reason is that even if batch mode can be slower than stream mode for > some > > > operators in some cases, this is an optional feature that will only be > > > enabled if a user explicitly overrides the newly introduced config to > > > non-default values. Existing jobs that simply upgrade their Flink > li
[DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints
Platform providers may want to disable hints completely for security reasons. Currently, there is a configuration to disable OPTIONS hint - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled However, there is no configuration available to disable QUERY hints - https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints The proposal is to add a new configuration: Name: table.query-options.enabled Description: Enable or disable the QUERY hint, if disabled, an exception would be thrown if any QUERY hints are specified Note: The default value will be set to true.
[jira] [Created] (FLINK-32883) Support for standby task managers
Tomoyuki NAKAMURA created FLINK-32883: - Summary: Support for standby task managers Key: FLINK-32883 URL: https://issues.apache.org/jira/browse/FLINK-32883 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0 Reporter: Tomoyuki NAKAMURA [https://docs.ververica.com/user_guide/application_operations/deployments/scaling.html#run-with-standby-taskmanager] I would like to be able to support standby task managers. Because on K8s, pods are often evicted or deleted due to node failure or autoscaling. With the current implementation, it is not possible to set up a standby task manager, and jobs cannot run until all task managers are up and running. If a standby task manager could be supported, jobs could continue to run without downtime using the standby task manager, even if the task manager is unexpectedly deleted. [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L370-L380] If the job manager's number of replicas is set, the job's parallelism setting is ignored, but it should be possible to support a standby task manager by automatically setting parallelism to the replicas*task slot only if the job's parallelism is not set (i.e. 0) and using that value if parallelism is set. If this change looks good, I will send a PR on GitHub. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs
Hi, Jiangjie Sorry for late reply. Thank you for such a detailed response. As you say, there are three behaviours here for users and I agree with you. The goal of this FLIP is to clarify the behaviour of the client side, which I also agree with. However, as weihua said, the config execution.attached is not only for per-job mode, but also for session mode, but the FLIP says that this is only for per-job mode, and this config will be removed in the future because the per-job mode has been deprecated. I don't think this is correct and we should change the description in the corresponding section of the FLIP. Since execution.attached is used in session mode, there is a compatibility issue here if we change it directly to client.attached.after.submission, and I think we should make this clear in the FLIP. Best, Ron Becket Qin 于2023年8月14日周一 20:33写道: > Hi Ron and Weihua, > > Thanks for the feedback. > > There seem three user sensible behaviors that we are talking about: > > 1. The behavior on the client side, i.e. whether blocking until the job > finishes or not. > > 2. The behavior of the submitted job, whether stop the job execution if the > client is detached from the Flink cluster, i.e. whether bind the lifecycle > of the job with the connection status of the attached client. For example, > one might want to keep a batch job running until finish even after the > client connection is lost. But it makes sense to stop the job upon client > connection lost if the job invokes collect() on a streaming job. > > 3. The behavior of the Flink cluster (JM and TMs), whether shutdown the > Flink cluster if the client is detached from the Flink cluster, i.e. > whether bind the cluster lifecycle with the job lifecycle. For dedicated > clusters (application cluster or dedicated session clusters), the lifecycle > of the cluster should be bound with the job lifecycle. But for shared > session clusters, the lifecycle of the Flink cluster should be independent > of the jobs running in it. > > As we can see, these three behaviors are sort of independent, the current > configurations fail to support all the combination of wanted behaviors. > Ideally there should be three separate configurations, for example: > - client.attached.after.submission and client.heartbeat.timeout control the > behavior on the client side. > - jobmanager.cancel-on-attached-client-exit controls the behavior of the > job when an attached client lost connection. The client heartbeat timeout > and attach-ness will be also passed to the JM upon job submission. > - cluster.shutdown-on-first-job-finishes *(*or > jobmanager.shutdown-cluster-after-job-finishes) controls the cluster > behavior after the job finishes normally / abnormally. This is a cluster > level setting instead of a job level setting. Therefore it can only be set > when launching the cluster. > > The current code sort of combines config 2 and 3 into > execution.shutdown-on-attach-exit. > This assumes the the life cycle of the cluster is the same as the job when > the client is attached. This FLIP does not intend to change that. but using > the execution.attached config for the client behavior control looks > misleading. So this FLIP proposes to replace it with a more intuitive > config of client.attached.after.submission. This makes it clear that it is > a configuration controlling the client side behavior, instead of the > execution of the job. > > Thanks, > > Jiangjie (Becket) Qin > > > > > > On Thu, Aug 10, 2023 at 10:34 PM Weihua Hu wrote: > > > Hi Allison > > > > Thanks for driving this FLIP. It's a valuable feature for batch jobs. > > This helps keep "Drop Per-Job Mode [1]" going. > > > > +1 for this proposal. > > > > However, it seems that the change in this FLIP is not detailed enough. > > I have a few questions. > > > > 1. The config 'execution.attached' is not only used in per-job mode, > > but also in session mode to shutdown the cluster. IMHO, it's better to > > keep this option name. > > > > 2. This FLIP only mentions YARN mode. I believe this feature should > > work in both YARN and Kubernetes mode. > > > > 3. Within the attach mode, we support two features: > > execution.shutdown-on-attached-exit > > and client.heartbeat.timeout. These should also be taken into account. > > > > 4. The Application Mode will shut down once the job has been completed. > > So, if we use the flink client to poll job status via REST API for attach > > mode, > > there is a chance that the client will not be able to retrieve the job > > finish status. > > Perhaps FLINK-24113[3] will help with this. > > > > > > [1]https://issues.apache.org/jira/browse/FLINK-26000 > > [2] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode > > [2]https://issues.apache.org/jira/browse/FLINK-24113 > > > > Best, > > Weihua > > > > > > On Thu, Aug 10, 2023 at 10:47 AM liu ron wrote: > > > > > Hi, Allison > > > > > > Thanks for driving this propo
Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints
Hi, Thanks for driving this proposal. Can you explain why you would need to disable query hints because of security issues? I don't really understand why query hints affects security. Best, Ron Bonnie Arogyam Varghese 于2023年8月16日周三 23:59写道: > Platform providers may want to disable hints completely for security > reasons. > > Currently, there is a configuration to disable OPTIONS hint - > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled > > However, there is no configuration available to disable QUERY hints - > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints > > The proposal is to add a new configuration: > > Name: table.query-options.enabled > Description: Enable or disable the QUERY hint, if disabled, an > exception would be thrown if any QUERY hints are specified > Note: The default value will be set to true. >
Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints
Hi Liu, Options hints could be a security concern since users can override settings. However, query hints specifically could affect performance. Since we have a config to disable Options hint, I'm suggesting we also have a config to disable Query hints. On Wed, Aug 16, 2023 at 9:41 AM liu ron wrote: > Hi, > > Thanks for driving this proposal. > > Can you explain why you would need to disable query hints because of > security issues? I don't really understand why query hints affects > security. > > Best, > Ron > > Bonnie Arogyam Varghese 于2023年8月16日周三 > 23:59写道: > > > Platform providers may want to disable hints completely for security > > reasons. > > > > Currently, there is a configuration to disable OPTIONS hint - > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled > > > > However, there is no configuration available to disable QUERY hints - > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints > > > > The proposal is to add a new configuration: > > > > Name: table.query-options.enabled > > Description: Enable or disable the QUERY hint, if disabled, an > > exception would be thrown if any QUERY hints are specified > > Note: The default value will be set to true. > > >
Re: FLINK-20767 - Support for nested fields filter push down
Btw, this is the FLIP proposal discussion thread. Please share your thoughts. Thanks. Regards Venkata krishnan On Sun, Aug 13, 2023 at 6:35 AM liu ron wrote: > Hi, Venkata krishnan > > Thanks for driving this work, look forward to your FLIP. > > Best, > Ron > > Venkatakrishnan Sowrirajan 于2023年8月13日周日 14:34写道: > > > Thanks Yunhong. That's correct. I am able to make it work locally. > > Currently, in the process of writing a FLIP for the necessary changes to > > the SupportsFilterPushDown API to support nested fields filter push down. > > > > Regards > > Venkata krishnan > > > > > > On Mon, Aug 7, 2023 at 8:28 PM yh z wrote: > > > > > Hi Venkatakrishnan, > > > Sorry for the late reply. I have looked at the code and feel like you > > need > > > to modify the logic of the > > > ExpressionConverter.visit(FieldReferenceExpression expression) method > to > > > support nested types, > > > which are not currently supported in currently code. > > > > > > Regards, > > > Yunhong Zheng (Swuferhong) > > > > > > Venkatakrishnan Sowrirajan 于2023年8月7日周一 13:30写道: > > > > > > > (Sorry, I pressed send too early) > > > > > > > > Thanks for the help @zhengyunhon...@gmail.com. > > > > > > > > Agree on not changing the API as much as possible as well as wrt > > > > simplifying Projection pushdown with nested fields eventually as > well. > > > > > > > > In terms of the code itself, currently I am trying to leverage the > > > > FieldReferenceExpression to also handle nested fields for filter push > > > down. > > > > But where I am currently struggling to make progress is, once the > > filters > > > > are pushed to the table source itself, in > > > > > > PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable > > > > there is a conversion from List > > > FieldReferenceExpression) to the List itself. > > > > > > > > If you have some pointers for that, please let me know. Thanks. > > > > > > > > Regards > > > > Venkata krishnan > > > > > > > > > > > > On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan < > > > > vsowr...@asu.edu> > > > > wrote: > > > > > > > > > Thanks @zhengyunhon...@gmail.com > > > > > Regards > > > > > Venkata krishnan > > > > > > > > > > > > > > > On Sun, Aug 6, 2023 at 6:16 PM yh z > > wrote: > > > > > > > > > >> Hi, Venkatakrishnan, > > > > >> I think this is a very useful feature. I have been focusing on the > > > > >> development of the flink-table-planner module recently, so if you > > need > > > > >> some > > > > >> help, I can assist you in completing the development of some > > sub-tasks > > > > or > > > > >> code review. > > > > >> > > > > >> Returning to the design itself, I think it's necessary to modify > > > > >> FieldReferenceExpression or re-implement a > > > > NestedFieldReferenceExpression. > > > > >> As for modifying the interface of SupportsProjectionPushDown, I > > think > > > we > > > > >> need to make some trade-offs. As a connector developer, the > > stability > > > of > > > > >> the interface is very important. If there are no unresolved bugs, > I > > > > >> personally do not recommend modifying the interface. However, > when I > > > > first > > > > >> read the code of SupportsProjectionPushDown, the design of int[][] > > was > > > > >> very > > > > >> confusing for me, and it took me a long time to understand it by > > > running > > > > >> specify UT tests. Therefore, in terms of the design of this > > interface > > > > and > > > > >> the consistency between different interfaces, there is indeed room > > for > > > > >> improvement it. > > > > >> > > > > >> Thanks, > > > > >> Yunhong Zheng (Swuferhong) > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> Becket Qin 于2023年8月3日周四 07:44写道: > > > > >> > > > > >> > Hi Jark, > > > > >> > > > > > >> > If the FieldReferenceExpression contains an int[] to support a > > > nested > > > > >> field > > > > >> > reference, List (or > > > > >> FieldReferenceExpression[]) > > > > >> > and int[][] are actually equivalent. If we are designing this > from > > > > >> scratch, > > > > >> > personally I prefer using List for > > > > >> consistency, > > > > >> > i.e. always resolving everything to expressions for users. > > > Projection > > > > >> is a > > > > >> > simpler case, but should not be a special case. This avoids > doing > > > the > > > > >> same > > > > >> > thing in different ways which is also a confusion to the users. > To > > > me, > > > > >> the > > > > >> > int[][] format would become kind of a technical debt after we > > extend > > > > the > > > > >> > FieldReferenceExpression. Although we don't have to address it > > right > > > > >> away > > > > >> > in the same FLIP, this kind of debt accumulates over time and > > makes > > > > the > > > > >> > project harder to learn and maintain. So, personally I prefer to > > > > address > > > > >> > these technical debts as soon as possible. > > > > >> > > > > > >> > Thanks, > > > > >> > > > > > >> > Jiangjie (Becket) Qin > > > > >> > > > > > >> > On Wed, Aug 2, 2023
Re: FLINK-20767 - Support for nested fields filter push down
Forgot to share the link - https://lists.apache.org/thread/686bhgwrrb4xmbfzlk60szwxos4z64t7 in the last email. Regards Venkata krishnan On Wed, Aug 16, 2023 at 11:55 AM Venkatakrishnan Sowrirajan < vsowr...@asu.edu> wrote: > Btw, this is the FLIP proposal discussion thread. Please share your > thoughts. Thanks. > > Regards > Venkata krishnan > > > On Sun, Aug 13, 2023 at 6:35 AM liu ron wrote: > >> Hi, Venkata krishnan >> >> Thanks for driving this work, look forward to your FLIP. >> >> Best, >> Ron >> >> Venkatakrishnan Sowrirajan 于2023年8月13日周日 14:34写道: >> >> > Thanks Yunhong. That's correct. I am able to make it work locally. >> > Currently, in the process of writing a FLIP for the necessary changes to >> > the SupportsFilterPushDown API to support nested fields filter push >> down. >> > >> > Regards >> > Venkata krishnan >> > >> > >> > On Mon, Aug 7, 2023 at 8:28 PM yh z wrote: >> > >> > > Hi Venkatakrishnan, >> > > Sorry for the late reply. I have looked at the code and feel like you >> > need >> > > to modify the logic of the >> > > ExpressionConverter.visit(FieldReferenceExpression expression) method >> to >> > > support nested types, >> > > which are not currently supported in currently code. >> > > >> > > Regards, >> > > Yunhong Zheng (Swuferhong) >> > > >> > > Venkatakrishnan Sowrirajan 于2023年8月7日周一 13:30写道: >> > > >> > > > (Sorry, I pressed send too early) >> > > > >> > > > Thanks for the help @zhengyunhon...@gmail.com. >> > > > >> > > > Agree on not changing the API as much as possible as well as wrt >> > > > simplifying Projection pushdown with nested fields eventually as >> well. >> > > > >> > > > In terms of the code itself, currently I am trying to leverage the >> > > > FieldReferenceExpression to also handle nested fields for filter >> push >> > > down. >> > > > But where I am currently struggling to make progress is, once the >> > filters >> > > > are pushed to the table source itself, in >> > > > >> > PushFilterIntoSourceScanRuleBase#resolveFiltersAndCreateTableSourceTable >> > > > there is a conversion from List> > > > FieldReferenceExpression) to the List itself. >> > > > >> > > > If you have some pointers for that, please let me know. Thanks. >> > > > >> > > > Regards >> > > > Venkata krishnan >> > > > >> > > > >> > > > On Sun, Aug 6, 2023 at 10:23 PM Venkatakrishnan Sowrirajan < >> > > > vsowr...@asu.edu> >> > > > wrote: >> > > > >> > > > > Thanks @zhengyunhon...@gmail.com >> > > > > Regards >> > > > > Venkata krishnan >> > > > > >> > > > > >> > > > > On Sun, Aug 6, 2023 at 6:16 PM yh z >> > wrote: >> > > > > >> > > > >> Hi, Venkatakrishnan, >> > > > >> I think this is a very useful feature. I have been focusing on >> the >> > > > >> development of the flink-table-planner module recently, so if you >> > need >> > > > >> some >> > > > >> help, I can assist you in completing the development of some >> > sub-tasks >> > > > or >> > > > >> code review. >> > > > >> >> > > > >> Returning to the design itself, I think it's necessary to modify >> > > > >> FieldReferenceExpression or re-implement a >> > > > NestedFieldReferenceExpression. >> > > > >> As for modifying the interface of SupportsProjectionPushDown, I >> > think >> > > we >> > > > >> need to make some trade-offs. As a connector developer, the >> > stability >> > > of >> > > > >> the interface is very important. If there are no unresolved >> bugs, I >> > > > >> personally do not recommend modifying the interface. However, >> when I >> > > > first >> > > > >> read the code of SupportsProjectionPushDown, the design of >> int[][] >> > was >> > > > >> very >> > > > >> confusing for me, and it took me a long time to understand it by >> > > running >> > > > >> specify UT tests. Therefore, in terms of the design of this >> > interface >> > > > and >> > > > >> the consistency between different interfaces, there is indeed >> room >> > for >> > > > >> improvement it. >> > > > >> >> > > > >> Thanks, >> > > > >> Yunhong Zheng (Swuferhong) >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> Becket Qin 于2023年8月3日周四 07:44写道: >> > > > >> >> > > > >> > Hi Jark, >> > > > >> > >> > > > >> > If the FieldReferenceExpression contains an int[] to support a >> > > nested >> > > > >> field >> > > > >> > reference, List (or >> > > > >> FieldReferenceExpression[]) >> > > > >> > and int[][] are actually equivalent. If we are designing this >> from >> > > > >> scratch, >> > > > >> > personally I prefer using List for >> > > > >> consistency, >> > > > >> > i.e. always resolving everything to expressions for users. >> > > Projection >> > > > >> is a >> > > > >> > simpler case, but should not be a special case. This avoids >> doing >> > > the >> > > > >> same >> > > > >> > thing in different ways which is also a confusion to the >> users. To >> > > me, >> > > > >> the >> > > > >> > int[][] format would become kind of a technical debt after we >> > extend >> > > > the >> > > > >> > FieldReferenceExpression. Although we don't have to address it >> >
[jira] [Created] (FLINK-32884) PyFlink remote execution should support URLs with paths and https scheme
Elkhan Dadashov created FLINK-32884: --- Summary: PyFlink remote execution should support URLs with paths and https scheme Key: FLINK-32884 URL: https://issues.apache.org/jira/browse/FLINK-32884 Project: Flink Issue Type: New Feature Components: Client / Job Submission, Runtime / REST Affects Versions: 1.17.1 Reporter: Elkhan Dadashov Currently, the `SUBMIT_ARGS=remote -m http://:` format. For local execution it works fine `SUBMIT_ARGS=remote -m http://localhost:8081`, but it does not support the placement of the JobManager befind a proxy or using an Ingress for routing to a specific Flink cluster based on the URL path. In current scenario, it expects JobManager access PyFlink jobs at `http://:/v1/jobs` endpoint. Mapping to a non-root location, `https://:/flink-clusters/namespace/flink_job_deployment/v1/jobs` is not supported. Since RestClusterClient talks to the JobManager via its REST endpoint, the right format for `SUBMIT_ARGS` is URL with path (also support for https scheme). I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java flink-core/src/main/java/org/apache/flink/util/NetUtils.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32885) Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution
Elkhan Dadashov created FLINK-32885: --- Summary: Refactoring: Moving UrlPrefixDecorator into flink-clients so it can be used by RestClusterClient for PyFlink remote execution Key: FLINK-32885 URL: https://issues.apache.org/jira/browse/FLINK-32885 Project: Flink Issue Type: New Feature Components: Client / Job Submission, Table SQL / Gateway Affects Versions: 1.17.1 Reporter: Elkhan Dadashov UrlPrefixDecorator is introduced in `flink-sql-gateway` module, which has dependency on `flink-clients` module. RestClusterClient will also need to use UrlPrefixDecorator for supporting PyFlink remote execution. Will refactor related classes to achieve this. I intend to change these classes in a backward compatible way flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/util/SQLGatewayUrlPrefixDecorator.java flink-clients/src/main/java/org/apache/flink/client/program/rest/MonitoringAPIMessageHeaders.java flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32886) Issue with volumeMounts when creating OLM for Flink Operator 1.6.0
James Busche created FLINK-32886: Summary: Issue with volumeMounts when creating OLM for Flink Operator 1.6.0 Key: FLINK-32886 URL: https://issues.apache.org/jira/browse/FLINK-32886 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.6.0 Reporter: James Busche I notice a volumemount problem when trying to deploy the OLM CSV for the 1.6.0 Flink Kubernetes Operator. (Following the directions from [OLM Verification of a Flink Kubernetes Operator Release|https://cwiki.apache.org/confluence/display/FLINK/OLM+Verification+of+a+Flink+Kubernetes+Operator+Release]] ^{{oc describe csv}}^ ^{{...}}^ ^{{Warning InstallComponentFailed 46s (x7 over 49s) operator-lifecycle-manager install strategy failed: Deployment.apps "flink-kubernetes-operator" is invalid: [spec.template.spec.volumes[2].name: Duplicate value: "keystore", spec.template.spec.containers[0].volumeMounts[1].name: Not found: "flink-artifacts-volume"]}}^ My current workaround is to change [line 88|https://github.com/apache/flink-kubernetes-operator/blob/main/tools/olm/docker-entry.sh#L88] to look like this: {{ ^yq ea -i '.spec.install.spec.deployments[0].spec.template.spec.volumes[1] = \{"name": "flink-artifacts-volume","emptyDir": {}}' "${CSV_FILE}"^}} ^{{yq ea -i '.spec.install.spec.deployments[0].spec.template.spec.volumes[2] = \{"name": "keystore","emptyDir": {}}' "${CSV_FILE}"}}^ And then the operator deploys without error: ^oc get csv NAME DISPLAY VERSION REPLACES PHASEflink-kubernetes-operator.v1.6.0 Flink Kubernetes Operator 1.6.0 flink-kubernetes-operator.v1.5.0 Succeeded^ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs
Hi Ron, Isn't the cluster (session or per job) only using the execution.attached to determine whether the client is attached? If so, the client can always include the information of whether it's an attached client or not in the JobSubmissoinRequestBody, right? For a shared session cluster, there could be multiple clients submitting jobs to it. These clients may or may not be attached. A static execution.attached configuration for the session cluster does not work in this case, right? The current problem of execution.attached is that it is not always honored. For example, if a session cluster was started with execution.attached set to false. And a client submits a job later to that session cluster with execution.attached set to true. In this case, the cluster won't (and shouldn't) shutdown after the job finishes or the attached client loses connection. So, in fact, the execution.attached configuration is only honored by the client, but not the cluster. Therefore, I think removing it makes sense. Thanks, Jiangjie (Becket) Qin On Thu, Aug 17, 2023 at 12:31 AM liu ron wrote: > Hi, Jiangjie > > Sorry for late reply. Thank you for such a detailed response. As you say, > there are three behaviours here for users and I agree with you. The goal of > this FLIP is to clarify the behaviour of the client side, which I also > agree with. However, as weihua said, the config execution.attached is not > only for per-job mode, but also for session mode, but the FLIP says that > this is only for per-job mode, and this config will be removed in the > future because the per-job mode has been deprecated. I don't think this is > correct and we should change the description in the corresponding section > of the FLIP. Since execution.attached is used in session mode, there is a > compatibility issue here if we change it directly to > client.attached.after.submission, and I think we should make this clear in > the FLIP. > > Best, > Ron > > Becket Qin 于2023年8月14日周一 20:33写道: > > > Hi Ron and Weihua, > > > > Thanks for the feedback. > > > > There seem three user sensible behaviors that we are talking about: > > > > 1. The behavior on the client side, i.e. whether blocking until the job > > finishes or not. > > > > 2. The behavior of the submitted job, whether stop the job execution if > the > > client is detached from the Flink cluster, i.e. whether bind the > lifecycle > > of the job with the connection status of the attached client. For > example, > > one might want to keep a batch job running until finish even after the > > client connection is lost. But it makes sense to stop the job upon client > > connection lost if the job invokes collect() on a streaming job. > > > > 3. The behavior of the Flink cluster (JM and TMs), whether shutdown the > > Flink cluster if the client is detached from the Flink cluster, i.e. > > whether bind the cluster lifecycle with the job lifecycle. For dedicated > > clusters (application cluster or dedicated session clusters), the > lifecycle > > of the cluster should be bound with the job lifecycle. But for shared > > session clusters, the lifecycle of the Flink cluster should be > independent > > of the jobs running in it. > > > > As we can see, these three behaviors are sort of independent, the current > > configurations fail to support all the combination of wanted behaviors. > > Ideally there should be three separate configurations, for example: > > - client.attached.after.submission and client.heartbeat.timeout control > the > > behavior on the client side. > > - jobmanager.cancel-on-attached-client-exit controls the behavior of the > > job when an attached client lost connection. The client heartbeat timeout > > and attach-ness will be also passed to the JM upon job submission. > > - cluster.shutdown-on-first-job-finishes *(*or > > jobmanager.shutdown-cluster-after-job-finishes) controls the cluster > > behavior after the job finishes normally / abnormally. This is a cluster > > level setting instead of a job level setting. Therefore it can only be > set > > when launching the cluster. > > > > The current code sort of combines config 2 and 3 into > > execution.shutdown-on-attach-exit. > > This assumes the the life cycle of the cluster is the same as the job > when > > the client is attached. This FLIP does not intend to change that. but > using > > the execution.attached config for the client behavior control looks > > misleading. So this FLIP proposes to replace it with a more intuitive > > config of client.attached.after.submission. This makes it clear that it > is > > a configuration controlling the client side behavior, instead of the > > execution of the job. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Thu, Aug 10, 2023 at 10:34 PM Weihua Hu > wrote: > > > > > Hi Allison > > > > > > Thanks for driving this FLIP. It's a valuable feature for batch jobs. > > > This helps keep "Drop Per-Job Mode [1]" going. > > > > > > +1 for this proposal. > > > > > > Howeve
Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs
Hi, Jiangjie Thanks for your detailed explanation, I got your point. If the execution.attached is only used for client currently, removing it also make sense to me. Best, Ron Becket Qin 于2023年8月17日周四 07:37写道: > Hi Ron, > > Isn't the cluster (session or per job) only using the execution.attached to > determine whether the client is attached? If so, the client can always > include the information of whether it's an attached client or not in the > JobSubmissoinRequestBody, right? For a shared session cluster, there could > be multiple clients submitting jobs to it. These clients may or may not be > attached. A static execution.attached configuration for the session cluster > does not work in this case, right? > > The current problem of execution.attached is that it is not always honored. > For example, if a session cluster was started with execution.attached set > to false. And a client submits a job later to that session cluster with > execution.attached set to true. In this case, the cluster won't (and > shouldn't) shutdown after the job finishes or the attached client loses > connection. So, in fact, the execution.attached configuration is only > honored by the client, but not the cluster. Therefore, I think removing it > makes sense. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Aug 17, 2023 at 12:31 AM liu ron wrote: > > > Hi, Jiangjie > > > > Sorry for late reply. Thank you for such a detailed response. As you say, > > there are three behaviours here for users and I agree with you. The goal > of > > this FLIP is to clarify the behaviour of the client side, which I also > > agree with. However, as weihua said, the config execution.attached is not > > only for per-job mode, but also for session mode, but the FLIP says that > > this is only for per-job mode, and this config will be removed in the > > future because the per-job mode has been deprecated. I don't think this > is > > correct and we should change the description in the corresponding section > > of the FLIP. Since execution.attached is used in session mode, there is a > > compatibility issue here if we change it directly to > > client.attached.after.submission, and I think we should make this clear > in > > the FLIP. > > > > Best, > > Ron > > > > Becket Qin 于2023年8月14日周一 20:33写道: > > > > > Hi Ron and Weihua, > > > > > > Thanks for the feedback. > > > > > > There seem three user sensible behaviors that we are talking about: > > > > > > 1. The behavior on the client side, i.e. whether blocking until the job > > > finishes or not. > > > > > > 2. The behavior of the submitted job, whether stop the job execution if > > the > > > client is detached from the Flink cluster, i.e. whether bind the > > lifecycle > > > of the job with the connection status of the attached client. For > > example, > > > one might want to keep a batch job running until finish even after the > > > client connection is lost. But it makes sense to stop the job upon > client > > > connection lost if the job invokes collect() on a streaming job. > > > > > > 3. The behavior of the Flink cluster (JM and TMs), whether shutdown the > > > Flink cluster if the client is detached from the Flink cluster, i.e. > > > whether bind the cluster lifecycle with the job lifecycle. For > dedicated > > > clusters (application cluster or dedicated session clusters), the > > lifecycle > > > of the cluster should be bound with the job lifecycle. But for shared > > > session clusters, the lifecycle of the Flink cluster should be > > independent > > > of the jobs running in it. > > > > > > As we can see, these three behaviors are sort of independent, the > current > > > configurations fail to support all the combination of wanted behaviors. > > > Ideally there should be three separate configurations, for example: > > > - client.attached.after.submission and client.heartbeat.timeout control > > the > > > behavior on the client side. > > > - jobmanager.cancel-on-attached-client-exit controls the behavior of > the > > > job when an attached client lost connection. The client heartbeat > timeout > > > and attach-ness will be also passed to the JM upon job submission. > > > - cluster.shutdown-on-first-job-finishes *(*or > > > jobmanager.shutdown-cluster-after-job-finishes) controls the cluster > > > behavior after the job finishes normally / abnormally. This is a > cluster > > > level setting instead of a job level setting. Therefore it can only be > > set > > > when launching the cluster. > > > > > > The current code sort of combines config 2 and 3 into > > > execution.shutdown-on-attach-exit. > > > This assumes the the life cycle of the cluster is the same as the job > > when > > > the client is attached. This FLIP does not intend to change that. but > > using > > > the execution.attached config for the client behavior control looks > > > misleading. So this FLIP proposes to replace it with a more intuitive > > > config of client.attached.after.submission. This makes it clear that it > > is > >
[jira] [Created] (FLINK-32887) SourceCoordinatorContext#workerExecutor may cause task initializing slowly
liang jie created FLINK-32887: - Summary: SourceCoordinatorContext#workerExecutor may cause task initializing slowly Key: FLINK-32887 URL: https://issues.apache.org/jira/browse/FLINK-32887 Project: Flink Issue Type: Improvement Components: Connectors / Common, Runtime / Coordination Affects Versions: 1.15.2 Reporter: liang jie SourceCoordinatorContext#workerExecutor is typically used to calculate partitions of a source task and is implemented by a ScheduledExecutorService with only 1 core (hard coded).Tasks to calculate partitions with be executed through the function 'workerExecutor.scheduleAtFixedRate'. --- In some case, for example, 'getSubscribedPartitions' method will take quite a long time(e.g. 5min) because of lots of topics are included in the same task or requests to outer systems timeout etc. And partitionDiscoveryInterval is set to a short intervel e.g. 1min. In this case, 'getSubscribedPartitions' runnable tasks will be triggered repeatedly and be queued in the queue of workerExecutor, during the first 'getSubscribedPartitions' task running duration, which causing 'checkPartitionChanges' tasks will be queued too. Each 'checkPartitionChanges' task needs to wait for 25mins(5 * 'getSubscribedPartitions' task execution duration) before it was executed. --- In my view, tasks of workerExecutor should be scheduled with fix deley rather than at fixed rate. Because there is no meaning that 'getSubscribedPartitions' tasks being repeatedly executed without a 'checkPartitionChanges' execution. -- This message was sent by Atlassian Jira (v8.20.10#820010)