Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
Thank you for answering all my questions. My suggestion would be to start off with exposing an API to allow dynamically changing operator parallelism as the users of flink will be better able to decide the right scaling policy. Once this functionality is there, its just a matter of providing polici

Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
We do exactly what you mentioned. However, it's not that simple unfortunately. Our services don't have a predictable performance as traffic varies a lot during the day. As I've explained above increase source parallelism to 2 was enough to tip over our services and reducing parallelism of the asy

Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
I am using the async IO operator. The problem is that increasing source parallelism from 1 to 2 was enough to tip our systems over the edge. Reducing the parallelism of async IO operator to 2 is not an option as that would reduce the throughput quite a bit. This means that no matter what we do, we'

Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread vishalovercome
Yes. While back-pressure would eventually ensure high throughput, hand tuning parallelism became necessary because the job with high source parallelism would immediately bring down our internal services - not giving enough time to flink to adjust the in-rate. Plus running all operators at such a hi

Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
In one of my jobs, windowing is the costliest operation while upstream and downstream operators are not as resource intensive. There's another operator in this job that communicates with internal services. This has high parallelism as well but not as much as that of the windowing operation. Running

Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Forgot to add one more question - 7. If maxParallelism needs to be set to control parallelism, then wouldn't that mean that we wouldn't ever be able to take a savepoint and rescale beyond the configured maxParallelism? This would mean that we can never achieve hand tuned resource efficient. I will

Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Some questions about adaptive scheduling documentation - "If new slots become available the job will be scaled up again, up to the configured parallelism". Does parallelism refer to maxParallelism or parallelism? I'm guessing its the latter because the doc later mentions - "In Reactive Mode (see

Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Thank you very much for the new release that makes auto-scaling possible. I'm currently running multiple flink jobs and I've hand tuned the parallelism of each of the operators to achieve the best throughput. I would much rather use the auto-scaling capabilities of flink than have to hand tune my j

Zookeeper or Kubernetes for HA?

2021-05-03 Thread vishalovercome
Flink docs provide details on setting up HA but doesn't provide any recommendations as such. For jobs running in kubernetes and having a zookeeper deployment, which high availability option would be more desirable? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.co

Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread vishalovercome
This is a very big release! Many thanks to the flink developers for their contributions to making Flink as good a framework that it is! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I researched a bit more and another suggested solution is to build a custom source function that somehow waits for each operator to load it's configuration which is infact set in the open method of the source itself. I'm not sure if that's a good idea as that just exposes entire job configuration t

Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I have to make my flink job dynamically configurable and I'm thinking about using broadcast state. My current static job configuration file consists of configuration of entire set of operators which I load into a case class and then I explicitly pass the relevant configuration of each operator as i

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread vishalovercome
I've gone through the example as well as the documentation and I still couldn't understand whether my use case requires joining. 1. What would happen if I didn't join?2. As the 2 incoming data streams have the same type, if joining is absolutely necessary then just a union (oneStream.union(anotherS

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread vishalovercome
Let me make the example more concrete. Say O1 gets as input a data stream T1 which it splits into two using some function and produces DataStreams of type T2 and T3, each of which are partitioned by the same key function TK. Now after O2 processes a stream, it could sometimes send the stream to O3

Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-23 Thread vishalovercome
Suppose i have a job with 3 operators with the following job graph: O1 => O2 // data stream partitioned by keyBy O1 => O3 // data stream partitioned by keyBy O2 => O3 // data stream partitioned by keyBy If operator O3 receives inputs from two operators and both inputs have the same type and valu

Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
I don't know how to reproduce it but what I've observed are three kinds of termination when connectivity with zookeeper is somehow disrupted. I don't think its an issue with zookeeper as it supports a much bigger kafka cluster since a few years. 1. The first kind is exactly this - https://github.

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-21 Thread vishalovercome
Thanks for your reply! What I have seen is that the job terminates when there's intermittent loss of connectivity with zookeeper. This is in-fact the most common reason why our jobs are terminating at this point. Worse, it's unable to restore from checkpoint during some (not all) of these terminat

Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
Thanks for your reply! What I have seen is that the job terminates when there's intermittent loss of connectivity with zookeeper. This is in-fact the most common reason why our jobs are terminating at this point. Worse, it's unable to restore from checkpoint during some (not all) of these terminat

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-16 Thread vishalovercome
I'm not sure if this addresses the original concern. For instance consider this sequence: 1. Job starts from savepoint 2. Job creates a few checkpoints 3. Job manager (just one in kubernetes) crashes and restarts with the commands specified in the kubernetes manifest which has the savepoint path

Re: state inside functions

2020-12-16 Thread vishalovercome
When running in HA mode or taking savepoints, if we pass configuration as constructor arguments, then it seems as though changing configuration at a later time doesn't work as it uses state to restore older configuration. How can we pass configuration while having the flexibility to change the valu

Changing application configuration when restoring from checkpoint/savepoint

2020-12-16 Thread vishalovercome
My flink job loads several configuration files that contain job, operator and business configuration. One of the operators is an AsyncOperator with function like so: class AsyncFun(config: T) extends RichAsyncFunction[X, Y] { @transient private lazy val client = f(config, metricGroup, etc.)

Re: Changing application configuration when restoring from checkpoint/savepoint

2020-12-16 Thread vishalovercome
Will this work - In main method, serialize config into a string and store it using ParameterTool with key as taskName and value as config (serialized as string). Then in the open method, lookup the relevant configuration using getTaskName(). A follow up to this would be configuring custom windowin

Will job manager restarts lead to repeated savepoint restoration?

2020-12-16 Thread vishalovercome
My flink job runs in kubernetes. This is the setup: 1. One job running as a job cluster with one job manager 2. HA powered by zookeeper (works fine) 3. Job/Deployment manifests stored in Github and deployed to kubernetes by Argo 4. State persisted to S3 If I were to stop (drain and take a savepoi