Re: [Question] Basic Python examples.wordcount on local FlinkRunner

2021-09-02 Thread Dian Fu
This seems more like a Beam issue although it uses Flink runner. It would be helpful to also send it to the Beam user mailing list. Regarding to this issue itself, could you check is input.txt accessible in the Docker container? Regards, Dian > 2021年9月3日 上午5:19,Adam Pearce 写道: > > Hello all,

Re: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread Yang Wang
Hi Alexis Thanks for your valuable inputs. First, I want to share why Flink has to overwrite the resources which are defined in the pod template. You could the fields that will be overwritten by Flink here[1]. I think the major reason is that Flink need to ensure the consistency between Flink con

Re: Triggers for windowed aggregations in Table API

2021-09-02 Thread Guowei Ma
Hi, John I agree with Caizhi that you might need to customize a window trigger. But there is a small addition, you need to convert Table to DataStream first. Then you can customize the trigger of the window. Because as far as I know, Table API does not support custom windows yet. For details on ho

Re: Checkpointing failure, subtasks get stuck

2021-09-02 Thread JING ZHANG
Hi Xiangyu Su, Because of the lack of detailed information, I could only give the troubleshooting ideas. I hope it is helpful to you. 1. find out which checkpoint expire. You could find that in WEB UI [1] or in `jobmanager.log` 2. find out operators which not finished checkpoint yet when the checkp

Re: Flink on Kubernetes

2021-09-02 Thread Guowei Ma
Hi, Julian I notice that your configuration includes "restart-strategy.fixed-delay.attempts: 10". It means that the job would fail after 10 times failure. So maybe it leads to the job not restarting again and you could increase this value. But I am not sure if this is the root cause. So if this do

Re: Verifying max-parallelism value

2021-09-02 Thread Guowei Ma
Hi, Niklas As far as I know, the maximum parallelism is not currently displayed on the web ui. Maximum parallelism is the concept of operator granularity, so I understand that it is a little difficult to show it. However, each job can have its own default value, if not, there is a calculation meth

Re: Triggers for windowed aggregations in Table API

2021-09-02 Thread Caizhi Weng
Hi! You might want to use your custom trigger to achieve this. Tumble windows are using EventTimeTrigger by default. Flink has another built-in trigger called CountTrigger but it only fires for every X records, ignoring the event time completely. You might want to create your own trigger to combi

Re: Verifying max-parallelism value

2021-09-02 Thread Caizhi Weng
Hi! Do you mean pipeline.max-parallelism or any other config options? If yes you should see it in the "Job Manager > Configuration" page of Flink UI. Which config option are you setting and how do you set that? Niklas Wilcke 于2021年9月3日周五 上午12:53写道: > Hi Flink community, > > most likely I'm mis

Re: Reuse in Blink execution plan

2021-09-02 Thread Caizhi Weng
Hi! Reusing common sub-plans are an optimization of Flink. Flink is really reusing them in runtime and the results of the reused tasks are calculated only once. Vasily Melnik 于2021年9月2日周四 下午6:32写道: > > Hi all. > > Using SQL with blink planner for batch calculations, i see *Reused* > nodes in Op

Re: Use FlinkKafkaConsumer to synchronize multiple Kafka topics

2021-09-02 Thread Arvid Heise
Hi Yan, Afaik this is not directly supported and would be surprising to other users since it's a rather specific requirement. In fact, Flink delegates reading the topics to Kafka consumer API and I suspect that the warning you received is also coming from Kafka consumer (I have not found a respect

Streaming Patterns and Best Practices - featuring Apache Flink

2021-09-02 Thread Devin Bost
I just released a new video that features Apache Flink in several design patterns: Streaming Patterns and Best Practices with Apache Pulsar for Enabling Machine Learning and Analytics I thought it might be of interest to the Flink community. Devin G. Bost

Re: Deploying Stateful functions with an existing Ververica cluster

2021-09-02 Thread Barry Higgins
Hi Igal, Thank you for getting back so quickly. All of our applications are currently deployed onto the one Ververica cluster so I would be quite keen to get the DataSteam integration option evaluated (I am currently hitting an exception where the ObjectMapper in DefaultHttpRequestReplyClientSpe

Triggers for windowed aggregations in Table API

2021-09-02 Thread John Smith
Hi, Sorry if this has been answered previously but I couldn't find any answer for the question and would appreciate any help. Context: Let's say I have a log stream in Kafka where message values have an *id* field along with a few other fields. I want to count the number of messages for each id fo

Flink on Kubernetes

2021-09-02 Thread Julian Cardarelli
Hello - We have implemented Flink on Kubernetes with Google Cloud Storage in high availability configuration as per the below configmap. Everything appears to be working normally, state is being saved to GCS. However, every now and then - perhaps weekly or every other week, all of the submitte

Verifying max-parallelism value

2021-09-02 Thread Niklas Wilcke
Hi Flink community, most likely I'm missing something but I failed to verify the setting of the max-parallelism (# key groups). Is there a way to check the value for a job? I checked the following places without finding it. 1. Flink UI: Job Configuration 2. Flink UI: SubTasks of a Job 3. Taskma

Re: FLINK-14316 happens on version 1.13.2

2021-09-02 Thread Yun Gao
Hi Xiangyu, There might be different reasons for the "Job Leader... lost leadership" problem. Do you see the erros in the TM log ? If so, the root cause might be that the connection between the TM and ZK is lost or timeout. Have you checked the GC status of the TM side ? If the GC is ok, could

Re: Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

2021-09-02 Thread Nicolaus Weidner
Hi Praneeth, It does look like a failure constructing the serializer. Can you share the serialization config you use for the Kafka producer? In particular, are you using a custom serializer? Do you use any custom classloading configuration? Best regards, Nico On Wed, Sep 1, 2021 at 11:38 PM Pran

Re: Deploying Stateful functions with an existing Ververica cluster

2021-09-02 Thread Igal Shilman
Hi Barry, I've forward your email to the user mailing list as it is more suitable here :-) Your question definitely makes sense, and let me try to provide you with some pointers: 1. The architecture that you've outlined has many advantages and is desirable if you can afford that. Some of them are

RE: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread Alexis Sarda-Espinosa
Just to provide my opinion, I find the idea of factors unintuitive for this specific case. When I’m working with Kubernetes resources and sizing, I have to think in absolute terms for all pods and define requests and limits with concrete values. Using factors for Flink means that I have to think

Re: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread spoon_lz
Hi Yang, I agree with you, but I think the limit-factor should be greater than or equal to 1, and default to 1 is a better choice. If the default value is 1.5, the memory limit will exceed the actual physical memory of a node, which may result in OOM, machine downtime, or random pod death if the

Re: Flink Performance Issue

2021-09-02 Thread Mohammed Kamaal
Hi Fabian, Just an update, Problem 2:- Caused by: org.apache.kafka.common.errors.NetworkException It is resolved. It was because we exceeded the number of allowed partitions for the kafka cluster (AWS MSK cluster). Have deleted unused topics and partitions to resolve the issue.

Reuse in Blink execution plan

2021-09-02 Thread Vasily Melnik
Hi all. Using SQL with blink planner for batch calculations, i see *Reused* nodes in Optimized Execution Plan while making self join operations: == Optimized Execution Plan == Union(all=[true], union=[id, v, v0, w0$o0]) :- OverAggregate(orderBy=[id DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS B

Re: Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Till Rohrmann
Forwarding the discussion back to the user mailing list. On Thu, Sep 2, 2021 at 12:25 PM Till Rohrmann wrote: > The stack trace looks ok. This happens whenever the leader loses > leadership and this can have different reasons. What's more interesting is > what happens before and after and what's

Re: Checkpointing failure, subtasks get stuck

2021-09-02 Thread Till Rohrmann
Hi Xiangyu, Can you provide us with more information about your job, which state backend you are using and how you've configured the checkpointing? Can you also provide some information about the problematic checkpoints (e.g. alignment time, async/sync duration) that you find on the checkpoint det

Re: Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Till Rohrmann
Hi Xiangyu, Do you have the logs of the problematic test run available? Ideally, we can enable the DEBUG log level to get some more information. I think this information would be needed to figure out the problem. Cheers, Till On Thu, Sep 2, 2021 at 11:47 AM Xiangyu Su wrote: > Hello Everyone,

Checkpointing failure, subtasks get stuck

2021-09-02 Thread Xiangyu Su
Hello Everyone, Hello Till, We were facing checkpointing failure issue since version 1.9, currently we are using version 1.13.2 We are using filesystem(s3) as statebackend, 10 mins checkpoint timeout, usually the checkpoint takes 10-30 seconds. But sometimes I have seen Job failed and restarted d

Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Xiangyu Su
Hello Everyone, Hello Till, We upgrade flink to 1.13.2, and we were facing randomly the "Job leader ... lost leadership" error, the job keep restarting and failing... It behaviours like this ticket https://issues.apache.org/jira/browse/FLINK-14316 Did anybody had same issue or any suggestions? Be

Re: De/Serialization API to tear-down user code

2021-09-02 Thread Dawid Wysakowicz
Hi Sergio, You can find the explanation why we haven't added the close method in the corresponding JIRA ticke[1]: When adding close() method to both DeserializationSchema and SerializationSchema with a default implementation, it breaks source compatibility if a user's class implements

Re: De/Serialization API to tear-down user code

2021-09-02 Thread Sergio Morales
Thank you for the answer. I’m using the (De)SerializationSchema in such way that it has a reference to a custom class that manages some resources. In the open() method I’m able to init the resources accordingly, but it is really strange that despite providing an “open()” there is no counter-part