Async Performance

2021-10-25 Thread Sanket Agrawal
Hello Everyone, I am using a series of 6 Async Operators in my application. Each operator is using AsyncHttpClient to make some external API call. Each Async Operator makes only one call to external API for a message. Capacity is set t 1000 for each parallelism. Approximately, we are getting re

FlinkKafkaProducer deprecated in 1.14 but pyflink binding still present?

2021-10-25 Thread Francis Conroy
Looks like this got deprecated in 1.14 in favour of KafkaSink/KafkaSource but the python binding didn't get updated? Can someone confirm this? Francis Conroy -- This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is ad

Re: How to execute multi SQL in one job

2021-10-25 Thread 刘建刚
Thanks very much. Forgive me for the simple question. I have found the doc in the latest code. My inner code is too old... Jake 于2021年10月26日周二 上午11:39写道: > > Hi, you can do like this: > > ```java > > val statementSet = tableEnv.createStatementSet() > val insertSqlBuffer = ListBuffer.empty[String

High Availability on Kubernetes

2021-10-25 Thread Deshpande, Omkar
Hello, We are running flink on Kubernetes(Standalone) in application cluster mode. The job manager is deployed as a deployment. We only deploy one instance/replica of job manager. So, the leader election service is not required. And we have set flink task execution retries to infinite. Do we st

Re: How to execute multi SQL in one job

2021-10-25 Thread Jake
Hi, you can do like this: ```java val statementSet = tableEnv.createStatementSet() val insertSqlBuffer = ListBuffer.empty[String] val calciteParser = new CalciteParser(SqlUtil.getSqlParserConfig(tableEnv.getConfig)) sqlArr .foreach(item => { println(item) val itemNode = cal

How to execute multi SQL in one job

2021-10-25 Thread 刘建刚
I have multi batch SQL commands separated by semicolon(;). The SQL commands need to be executed in order(In other cases, the SQL command may share sources or sinks). I want to execute them in one job. When I use tableEnv.executeSql(multiSQL), it will throw errors. How can I execute them in one job

Re: High Availability on Kubernetes

2021-10-25 Thread Xintong Song
Without HA, your job can restore from the latest successful checkpoint only if your jobmanager process / pod has not failed. If the jobmanager failed, the new jobmanager brought up by Kubernetes will not be able to find the latest successful checkpoint without HA. Jobmanager can fail due to not onl

Re: [External] : Re: How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes

2021-10-25 Thread Fuyao Li
Thanks! I got your point. Will try it out. From: Chesnay Schepler Date: Tuesday, October 19, 2021 at 01:44 To: Fuyao Li , user Cc: Rohit Gupta Subject: Re: [External] : Re: How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes 1) Adding it as a dependency to

Re: SplitEnumeratorContext callAsync() cleanup

2021-10-25 Thread Fabian Paul
Hi Mason, Thanks for opening the ticket. Can you also share the log with us when the KafkaEnumerator closed before the async call finished? Best, Fabian

RE: Troubleshooting checkpoint timeout

2021-10-25 Thread Alexis Sarda-Espinosa
Oh, I got it. I should’ve made the connection earlier after you said “Once an operator decides to send/broadcast a checkpoint barrier downstream, it just broadcasts it to all output channels”. I’ll see what I can do about upgrading the Flink version and do some more tests with unaligned checkpo

Re: Getting Errors in Standby Jobmanager pod during installation & after restart on k8s

2021-10-25 Thread Roman Khachatryan
Hi Amit, AFAIK, these exceptions are normal in HA mode as different JM instances are trying to acquire the lease. Regards, Roman On Mon, Oct 25, 2021 at 1:45 PM Amit Bhatia wrote: > > Hi, > > We have deployed two jobmanagers in HA mode on kubernetes using k8s configmap > solution with deployme

Re: Troubleshooting checkpoint timeout

2021-10-25 Thread Piotr Nowojski
Hi Alexis, > Should I understand these metrics as a property of an operator and not of each subtask (at least for aligned checkpoints)? Then “first” and “last” would make sense to me: first/last across all subtasks/channels for a given operator. Those are properties of a subtask. Subtasks are a

RE: Troubleshooting checkpoint timeout

2021-10-25 Thread Alexis Sarda-Espinosa
Hi again, Thanks a lot for taking the time to clarify this. I think that the main thing that is confusing me is that the UI shows Alignment Duration and other checkpoint metrics for each subtask, and the resources you’ve sent always discuss a single barrier per subtask channel. Should I underst

Re: Alternate to PreserveWatermark() in recent Flink versions

2021-10-25 Thread JING ZHANG
Hi Arujit, I got it, sorry for misunderstanding your requirements before. Yes, if you already specify watermark strategy on the source (please see more in document [1]), you don't need to call `DataStream. assignTimestampsAndWatermarks` after source anymore. The watermark would be kept by default.

Getting Errors in Standby Jobmanager pod during installation & after restart on k8s

2021-10-25 Thread Amit Bhatia
Hi, We have deployed two jobmanagers in HA mode on kubernetes using k8s configmap solution with deployment controller. During Installation and after restart we are getting below errors in standby jobmanager. 2021-10-25 11:17:46,397 ERROR io.fabric8.kubernetes.client.extended.leaderelection.Leader

Re: Troubleshooting checkpoint timeout

2021-10-25 Thread Piotr Nowojski
Hi again Alexis, First answering your questions: > 1. The documentation states “Operators that receive more than one input stream need to align the input streams on the snapshot barriers”. If an operator has parallelism > 1, does that count as more than one stream? Or is there a single output barr

Re: Why we need again kubernetes flink operator?

2021-10-25 Thread Yang Wang
Hi Bhaskar, IIUC, flink-k8s-operator and Flink native K8s mode are orthogonal. They do not mean to replace other one. The flink-k8s-operator is more like a Flink lifecycle management tool. It could make deploying a Flink application on K8s easier. We just need to apply a CR yaml and is more frien

Re: High availability data clean up

2021-10-25 Thread Yang Wang
Hi Weiqing, > Why does Flink not set the owner reference of HA related ConfigMaps to JobManager deployment? It is easier to clean up for users. The major reason is that simply deleting the HA related ConfigMaps will make the HA data located in DFS leak. > How to delete the HA ConfigMap from exter

Re: Flink on kubernetes HA ::Renew deadline reached

2021-10-25 Thread marco
Hello thanks for your response, I just want to know why you suggest that i should use the native Kubernetes HA. Im my case I need to use the standalone application mode. On 2021/10/25 09:25:21, Xintong Song wrote: > You should be using the native Kubernetes HA. This error message suggests > Fl

Re: Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread 陳昌倬
On Mon, Oct 25, 2021 at 05:19:32PM +0800, JING ZHANG wrote: > Hi, > > We wonder if this is expected behavior or not? > I think it's expected. You could find more information in document [1]. > Checkpoints and Savepoints differ in their implementation. Checkpoints are > designed to be lightweight an

Re: Not cleanup Kubernetes Configmaps after execution success

2021-10-25 Thread Yang Wang
Hi Hua Wei, I think you need to share the JobManager logs so that we could check whether Flink had tried to clean up the HA related ConfigMaps. Using the "kubectl logs -f >/tmp/log.jm" could help with dumping the logs. Best, Yang Roman Khachatryan 于2021年10月25日周一 下午5:35写道: > Hi Hua, > > It lo

RE: Troubleshooting checkpoint timeout

2021-10-25 Thread Alexis Sarda-Espinosa
Hi Piotrek, Thanks for all the information, I guess I was reading older versions of the documentation that didn’t have that. I was just using the job graph UI to check backpressure, but after looking at other factors, I think there is indeed some backpressure, but I don’t know how it builds up

Re: Alternate to PreserveWatermark() in recent Flink versions

2021-10-25 Thread Arujit Pradhan
Hey JING, Thanks a lot for replying to the thread! Yeah, we are looking at `PreserveWatermarks`. But the issue is that the datastream.assignTimestampsAndWatermarks() takes `WatermarkStrategy`(from org.apache.flink.api.common.eventtime) and there is no default method to define preserveWaterMark St

Re: Not cleanup Kubernetes Configmaps after execution success

2021-10-25 Thread Roman Khachatryan
Hi Hua, It looks like the ConfigMap misses HA labels for some reason. Could you confirm that you are running in HA mode? Which deployment mode are you using? [1] I'm also pulling in Yan Wang who might know this area better. [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dep

Re: Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread JING ZHANG
Hi, > We wonder if this is expected behavior or not? I think it's expected. You could find more information in document [1]. Checkpoints and Savepoints differ in their implementation. Checkpoints are designed to be lightweight and fast. They might (but don’t necessarily have to) make use of differe

Re: Flink on kubernetes HA ::Renew deadline reached

2021-10-25 Thread marco
Any suggestions would be appreciated. On 2021/10/20 16:18:39, marco wrote: > > > Hello flink community:: > > I am deploying flink application cluster standalone mode on kubernetes, but i > am facing some problems > > the job starts normally and it continues to run but at some point in time

Re: Kafka Stream Window

2021-10-25 Thread JING ZHANG
Hi, I guess you would get suggestions more quickly if you send this email in user mail list of KStream.😀 Or you could use Flink to complete your requirements directly. Mohammed Kamaal 于2021年10月25日周一 下午3:47写道: > Hi, > > Is there a way to define a sliding window with a count (number of > occurrenc

Re: Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread Roman Khachatryan
Hi ChangZhuo, Yes, restoring from a savepoint is expected to be significantly slower from a checkpoint. Regards, Roman On Mon, Oct 25, 2021 at 9:45 AM ChangZhuo Chen (陳昌倬) wrote: > > Hi, > > We found that our application savepoint restoration time (~ 40 mins) is > much slower than checkpoint re

Re: Alternate to PreserveWatermark() in recent Flink versions

2021-10-25 Thread JING ZHANG
Hi, I'm not sure I understand your requirement. However, are you looking for `PreserveWatermarks` in package `org.apache.flink.table.sources.wmstrategies`? Best, JING ZHANG Arujit Pradhan 于2021年10月25日周一 下午4:02写道: > Hi all, > > > We maintain an Open-sourced project for protobuf data processing

Alternate to PreserveWatermark() in recent Flink versions

2021-10-25 Thread Arujit Pradhan
Hi all, We maintain an Open-sourced project for protobuf data processing using Flink dagger . But we are currently on Flink-1.9 and want to migrate to the latest stable 1.14. In the older version, we use `*StreamTableSource` *and ` *DefinedRowtimeAttributes` *APIs

Re: Troubleshooting checkpoint timeout

2021-10-25 Thread Piotr Nowojski
Hi Alexis, You can read about those metrics in the documentation [1]. Long alignment duration and start delay almost always come together. High values indicate long checkpoint barrier propagation times through the job graph, that's always (at least so far I haven't seen a different reason) caused

Kafka Stream Window

2021-10-25 Thread Mohammed Kamaal
Hi, Is there a way to define a sliding window with a count (number of occurrence) of stream in Kafka Stream Window (KStream)? In Flink we have, stream.keyBy().countWindow(3,1) //where 3 is number/size of streams Can you suggest an equivalent of the above in KStream The below code takes only dur

Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread 陳昌倬
Hi, We found that our application savepoint restoration time (~ 40 mins) is much slower than checkpoint restoration time (~ 4 mins). We wonder if this is expected behavior or not? Some detail about the environment: * Flink version: 1.14.0 * Persistent storage is GCS, via the following jars: