Thank you for answering all my questions. My suggestion would be to start off
with exposing an API to allow dynamically changing operator parallelism as
the users of flink will be better able to decide the right scaling policy.
Once this functionality is there, its just a matter of providing polici
We do exactly what you mentioned. However, it's not that simple
unfortunately. Our services don't have a predictable performance as traffic
varies a lot during the day.
As I've explained above increase source parallelism to 2 was enough to tip
over our services and reducing parallelism of the asy
I am using the async IO operator. The problem is that increasing source
parallelism from 1 to 2 was enough to tip our systems over the edge.
Reducing the parallelism of async IO operator to 2 is not an option as that
would reduce the throughput quite a bit. This means that no matter what we
do, we'
Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a hi
In one of my jobs, windowing is the costliest operation while upstream and
downstream operators are not as resource intensive. There's another operator
in this job that communicates with internal services. This has high
parallelism as well but not as much as that of the windowing operation.
Running
Forgot to add one more question - 7. If maxParallelism needs to be set to
control parallelism, then wouldn't that mean that we wouldn't ever be able
to take a savepoint and rescale beyond the configured maxParallelism? This
would mean that we can never achieve hand tuned resource efficient. I will
Some questions about adaptive scheduling documentation - "If new slots become
available the job will be scaled up again, up to the configured
parallelism".
Does parallelism refer to maxParallelism or parallelism? I'm guessing its
the latter because the doc later mentions - "In Reactive Mode (see
Thank you very much for the new release that makes auto-scaling possible. I'm
currently running multiple flink jobs and I've hand tuned the parallelism of
each of the operators to achieve the best throughput. I would much rather
use the auto-scaling capabilities of flink than have to hand tune my j
Flink docs provide details on setting up HA but doesn't provide any
recommendations as such. For jobs running in kubernetes and having a
zookeeper deployment, which high availability option would be more
desirable?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.co
This is a very big release! Many thanks to the flink developers for their
contributions to making Flink as good a framework that it is!
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
I researched a bit more and another suggested solution is to build a custom
source function that somehow waits for each operator to load it's
configuration which is infact set in the open method of the source itself.
I'm not sure if that's a good idea as that just exposes entire job
configuration t
I have to make my flink job dynamically configurable and I'm thinking about
using broadcast state. My current static job configuration file consists of
configuration of entire set of operators which I load into a case class and
then I explicitly pass the relevant configuration of each operator as i
I've gone through the example as well as the documentation and I still
couldn't understand whether my use case requires joining. 1. What would
happen if I didn't join?2. As the 2 incoming data streams have the same
type, if joining is absolutely necessary then just a union
(oneStream.union(anotherS
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
Suppose i have a job with 3 operators with the following job graph:
O1 => O2 // data stream partitioned by keyBy
O1 => O3 // data stream partitioned by keyBy
O2 => O3 // data stream partitioned by keyBy
If operator O3 receives inputs from two operators and both inputs have the
same type and valu
I don't know how to reproduce it but what I've observed are three kinds of
termination when connectivity with zookeeper is somehow disrupted. I don't
think its an issue with zookeeper as it supports a much bigger kafka cluster
since a few years.
1. The first kind is exactly this -
https://github.
Thanks for your reply!
What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminat
Thanks for your reply!
What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminat
I'm not sure if this addresses the original concern. For instance consider
this sequence:
1. Job starts from savepoint
2. Job creates a few checkpoints
3. Job manager (just one in kubernetes) crashes and restarts with the
commands specified in the kubernetes manifest which has the savepoint path
When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a
later time doesn't work as it uses state to restore older configuration. How
can we pass configuration while having the flexibility to change the valu
My flink job loads several configuration files that contain job, operator and
business configuration. One of the operators is an AsyncOperator with
function like so:
class AsyncFun(config: T) extends RichAsyncFunction[X, Y] {
@transient private lazy val client = f(config, metricGroup, etc.)
Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as
string). Then in the open method, lookup the relevant configuration using
getTaskName().
A follow up to this would be configuring custom windowin
My flink job runs in kubernetes. This is the setup:
1. One job running as a job cluster with one job manager
2. HA powered by zookeeper (works fine)
3. Job/Deployment manifests stored in Github and deployed to kubernetes by
Argo
4. State persisted to S3
If I were to stop (drain and take a savepoi
23 matches
Mail list logo