Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread arjun s
Hi team, Thank you for your response. Could you please provide a sample regex(source.path.regex-pattern) for the following scenarios: Matching filenames that start with "flink" Eg : flink_2023_11_08.csv Matching filenames that end with "flink.csv" Eg: customer_2023_11_08_flink.csv Thanks and rega

Re: Disable flink old checkpoint clean

2023-11-07 Thread Jinzhong Li
Hi Yang, I think there is no configuration option available that allow users to disable checkpoint file cleanup at runtime. Does your flink application use incremental checkpoint? 1) If yes, i think leveraging S3's lifecycle management to clean checkpoint files is not safe, because it may acciden

Re: 配置了state.checkpoints.num-retained为1,但taskmanger 中checkpoints数量越来越多,占用内存,如何解决?

2023-11-07 Thread Yu Chen
Hi 嘉贤, 这不太符合预期。请问任务中间有发生手动Cancel的情况吗?这种情况下,Flink的默认行为是RETAINED_ON_CANCELLATION,需要手动清理。 如果你希望在任务CANCEL之后将Checkpoint清理,可以考虑调整参数execution-checkpointing-externalized-checkpoint-retention[1]. [1] http://stream.devops.sit.xiaohongshu.com/docs/red/docs/deployment/config/#execution-checkpointing-extern

Re: 配置了state.checkpoints.num-retained为1,但taskmanger 中checkpoints数量越来越多,占用内存,如何解决?

2023-11-07 Thread Yu Chen
Hi 嘉贤, Flink Web上展示的Checkpoint的历史记录,state-checkpoints.num-retained参数会控制在Checkpoint storage中存储的checkpoint数量,Flink会滚动删除Checkpoint storage的checkpoint文件,但是这个过程中Flink Web上记录是不会删除的(你可以在对应的Checkpoint记录的Path上的地址去确认)。 同时,如果你是Heap StateBackend,那么状态是存储到内存里的,checkpoint是flush到文件的。之所以内存增大大概率是任务本身导致的,而非历史Chec

Re: [DISCUSS] FLINK-32737: At-least-once sink connector for Snowflake

2023-11-07 Thread Mohsen Rezaei
This is now available as an open-source project: https://github.com/deltastreaminc/flink-connector-snowflake The first version, {{1.0.7-1.17}} is published on the Maven Central. On Tue, Aug 8, 2023 at 10:56 PM Mohsen Rezaei wrote: > Hi all, > > I'm in the process of writing a Snowflake sink con

Re: Flink operator autoscaler scaling down

2023-11-07 Thread Yang LI
Hi Gyula, Thank you for the feedback! With your permission, I plan to integrate the implementation into the flink-kubernetes-operator-autoscaler module to test it on my env. Subsequently, maybe contribute these changes back to the community by submitting a pull request to the GitHub repository in

Re: Disable flink old checkpoint clean

2023-11-07 Thread Yang LI
Hi Martijn, We're currently utilizing flink-s3-fs-presto. After reviewing the flink-s3-fs-hadoop source code, I believe we would encounter similar issues with it as well. When we say, 'The purpose of a checkpoint, in principle, is that Flink manages its lifecycle,' I think it implies that the au

Re: Disable flink old checkpoint clean

2023-11-07 Thread Yang LI
Hi Junrui, Currently, we have configured our flink cluster with execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION and state.checkpoints.num-retained: 10. However, this setup begins to delete the oldest checkpoint once we exceed 10. Typically, by the time substantia

Re: Flink operator autoscaler scaling down

2023-11-07 Thread Gyula Fóra
Sounds like a lot of work for very little gain to me. If you really feel that there is some room for improvement with the current implementation, it may be simpler to fix that . Gyula On Tue, 7 Nov 2023 at 01:20, Yang LI wrote: > Thanks for the information! > > I haven't tested Kuberntes's buil

Re: Disable flink old checkpoint clean

2023-11-07 Thread Martijn Visser
Ah, I actually misread checkpoint and savepoints, sorry. The purpose of a checkpoint in principle is that Flink manages its lifecycle. Which S3 interface are you using for the checkpoint storage? On Tue, Nov 7, 2023 at 6:39 PM Martijn Visser wrote: > > Hi Yang, > > If you use the NO_CLAIM mode, F

Re: Disable flink old checkpoint clean

2023-11-07 Thread Martijn Visser
Hi Yang, If you use the NO_CLAIM mode, Flink will not assume ownership of a snapshot and leave it up to the user to delete them. See the blog [1] for more details. Best regards, Martijn [1] https://flink.apache.org/2022/05/06/improvements-to-flink-operations-snapshots-ownership-and-savepoint-f

[PyFlink] Collect multiple elements in CoProcessFunction

2023-11-07 Thread Alexander Fedulov
Java ProcessFunction API defines a clear way to collect data via the Collector object. PyFlink documentation also refers to the Collector [1] , but it is not being passed to the function and is also nowhere to be found in the pyflink source code. How can multiple elements be collected? Is "yield"

Re: Disable flink old checkpoint clean

2023-11-07 Thread Junrui Lee
Hi Yang, You can try configuring "execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION"[1] and increasing the value of "state.checkpoints.num-retained"[2] to retain more checkpoints. Here are the official documentation links for more details: [1] https://nightlies.

Disable flink old checkpoint clean

2023-11-07 Thread Yang LI
Dear Flink Community, In our Flink application, we persist checkpoints to AWS S3. Recently, during periods of high job parallelism and traffic, we've experienced checkpoint failures. Upon investigating, it appears these may be related to S3 delete object requests interrupting checkpoint re-uploads

Flink metrics to Prometheus on Kubernetes

2023-11-07 Thread Raihan Sunny via user
Hi, I have a few Flink jobs running on Kubernetes using the Flink Kubernetes Operator. By following the documentation [1] I was able to set up monitoring for the Operator itself. As for the jobs themselves, I'm a bit confused about how to properly set it up. Here's my FlinkDeployment configuration

Re: Error in /jars/upload curl request

2023-11-07 Thread Tauseef Janvekar
Hi Chen, Thanks for your help. It worked fine. Thanks, Tauseef On Tue, 7 Nov 2023 at 16:20, Yu Chen wrote: > Hi Tauseef, > > That's really dependent on the environment you're actually running in. But > I'm guessing you're using ingress to route your requests to the JM POD. > If so, I'd suggest

Re: Error in /jars/upload curl request

2023-11-07 Thread Yu Chen
Hi Tauseef, That's really dependent on the environment you're actually running in. But I'm guessing you're using ingress to route your requests to the JM POD. If so, I'd suggest you adjust the value of nginx.ingress.kubernetes.io/proxy-body-size. Following is an example. ``` apiVersion: extens

Re: Error in /jars/upload curl request

2023-11-07 Thread Tauseef Janvekar
Hi Chen, We are not using nginx anywhere on the server(kubernetes cluster) or on my client(my local machine). Not sure how to proceed on this. Thanks, Tauseef On Tue, 7 Nov 2023 at 13:36, Yu Chen wrote: > Hi Tauseef, > > The error was caused by the nginx configuration and was not a flink > pro

Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread Yu Chen
Hi Arjun, As stated in the document, 'This regex pattern should be matched with the absolute file path.' Therefore, you should adjust your regular expression to match absolute paths. Please let me know if there are any other problems. Best, Yu Chen > 2023年11月7日 18:11,arjun s 写道: > > Hi Chen,

Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread arjun s
Hi Chen, I attempted to configure the 'source.path.regex-pattern' property in the table settings as '^customer.*' to ensure that the Flink job only processes file names starting with "customer" in the specified directory. However, it appears that this configuration is not producing the expected res

Re: Flink operator autoscaler scaling down

2023-11-07 Thread Yang LI
Thanks for the information! I haven't tested Kuberntes's built-in rollback mechanism yet. I feel like I can create another independent operator which detects flink application jvm memory and triggers rollback. Another solution I would like to discuss is also to implement an independent operator.

Re: Checkpoints are not triggering when S3 is unavailable

2023-11-07 Thread Evgeniy Lyutikov
Hi, thanks for the reply We use the kafka entry as a sink. It’s not clear why the flink stops triggering new checkpoints that would time out as expected. От: Hangxiang Yu Отправлено: 6 ноября 2023 г. 11:02:08 Кому: Evgeniy Lyutikov; user@flink.apache.org Тема: R

Re: Error in /jars/upload curl request

2023-11-07 Thread Yu Chen
Hi Tauseef, The error was caused by the nginx configuration and was not a flink problem. You can find many related solutions on the web [1]. Best, Yu Chen [1] https://stackoverflow.com/questions/24306335/413-request-entity-too-large-file-upload-issue > 2023年11月7日 15:14,Tauseef Janvekar 写道: >