Re: KeyBy distribution across taskslots

2019-02-28 Thread Congxian Qiu
Hi Maybe you could add a prefix for each key, so the hot keys can distributed to many tasks. Best, Congxian On Feb 28, 2019, 21:16 +0800, Yun Tang , wrote: > Hi, > > If you noticed that some key groups are hot and in high load, you could try > to increase the total key groups number (by increas

Re: How do I compute the average and keep track of a state over a window in DataStream?

2019-02-28 Thread Congxian Qiu
Hi Felipe Maybe you could use process function[1] [1]  https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html Best, Congxian On Feb 28, 2019, 22:47 +0800, Felipe Gutierrez , wrote: > Hi all, > > I want to compute the average of two stream data sou

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud, Thanks for the further feedbacks! For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint

Re: Job Manager not able to fetch job info when restarted

2019-02-28 Thread sen
If I want the job be restarted after jobmanager restart ,is it must be zookeeper when on HA mode? high-availability: zookeeper -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul, Thanks for your feedback. If the at-least-once mode still causes the problem, we can confirm it is not caused by blocking behavior in exactly-once-mode mentioned before. For at-least once, the task would continue processing the buffers following with barriers during allignment. But fo

Flink Kinesis Consumer

2019-02-28 Thread Steven Nelson
Hello! Does anyone know if the Flink Kinesis Consumer supports stopping rather than cancelling? I don't see that it implements StoppableFunction, but I might be wrong. -Steve

RE: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread LINZ, Arnaud
Update : Option 1 does not work. It still fails at the end of the timeout, no matter its value. Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ? De : LINZ, Arnaud Envoyé : jeudi 28 février 2019 15:47 À : 'zhijiang

Re: Flink Standalone cluster - production settings

2019-02-28 Thread Padarn Wilson
Are you able to give some detail on in which cases you might be better off setting higher (or lower) parallelism for an operator? On Thu, Feb 21, 2019 at 9:54 PM Hung wrote: > / Each job has 3 asynch operators > with Executors with thread counts of 20,20,100/ > > Flink handles parallelisms for y

Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-28 Thread Austin Cawley-Edwards
Hi Gary, No, I am running a YARN session (which I start with: flink-yarn-session --slots 4 --taskManagerMemory 16GB --jobManagerMemory 3GB --detached) and submit jobs through the REST interface. Thank you for the tips - I will probably shade it on my side. Is there an official location that the ub

Re: Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Kumar Bolar, Harshith
Thanks a lot. Looking into the logs sounds like a much cleaner approach :-) From: Till Rohrmann Date: Thursday, 28 February 2019 at 8:14 PM To: Harshith Kumar Bolar Cc: user Subject: [External] Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory? Yes this is on

RE: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread LINZ, Arnaud
Hi Zhihiang, Thanks for your feedback. * I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large. * Option 3 won’t work ; I already ta

How do I compute the average and keep track of a state over a window in DataStream?

2019-02-28 Thread Felipe Gutierrez
Hi all, I want to compute the average of two stream data sources and also keep track of a ValueState variable which is a CountMinSketch class that I implemented. For this, I tried to use RichAggregateFunction however it throws an exception saying that - Exception in thread "main" java.lang.Unsupp

Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Till Rohrmann
Yes this is one way. Another way could be to look into the logs of the running TaskManagers. They should contain the path of the blob store directory. Cheers, Till On Thu, Feb 28, 2019 at 12:04 PM Kumar Bolar, Harshith wrote: > Is there any way to figure out which one is being run on the TaskMa

Re: event time & watermarks in connected streams with broadcast state

2019-02-28 Thread Rinat
Thanks Konstantin ! > On 28 Feb 2019, at 02:33, Konstantin Knauf wrote: > > HI Rinat, > > to my knowledge your workaround is fine & necessary. You can also emit a > Long.MAX_VALUE instead of the processing time to save some CPU cycles. > > Cheers, > > Konstantin > > > > On Wed, Feb 27, 2

Re: KeyBy distribution across taskslots

2019-02-28 Thread Yun Tang
Hi, If you noticed that some key groups are hot and in high load, you could try to increase the total key groups number (by increase the max parallelism), but pay attention that it would cause previous checkpoint cannot be restored . With the help of this, we might let the hot key groups share

Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-28 Thread Gary Yao
Hi Austin, Are you running your job detached in a per-job cluster? In that case inverted class loading does not work. This is because we add the user jar to the system class path, and there is no dynamic class loading involved at the moment [1]. You can try the YARN session mode, or – as Chesnay

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread Paul Lam
Hi Zhijiang, Thanks a lot for your reasoning! I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily the problem remains the same :( IMHO, if it’s caused by barrier alignment, the state size (mainly buffers during alignment) would be big, right? But actually it’s not,

Re: Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Kumar Bolar, Harshith
Is there any way to figure out which one is being run on the TaskManager? Would it be safe to assume that it is the latest directory created? Regards, Harshith From: Till Rohrmann Date: Thursday, 28 February 2019 at 3:28 PM To: Harshith Kumar Bolar Cc: user Subject: [External] Re: Re: What ar

Re: submit job failed on Yarn HA

2019-02-28 Thread Gary Yao
Hi Sen, I took a look at the CLI code again, and found out that -m is ignored if high- availability: ZOOKEEPER is configured in your flink-conf.yaml. This does not seem right and should be at least documented [1]. Judging from the client logs that you provided, I think the problem is that the cli

Re: KeyBy distribution across taskslots

2019-02-28 Thread Fabian Hueske
Hi, The answer is in fact no. Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to tasks, i.e., a task can process more than one key group. AFAIK, there are no plans to change this behavior. Stefan (in CC) might be able to give more details on this. Something that might

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud, I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock. Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to tr

Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-28 Thread Till Rohrmann
Yes, at the moment this does not happen automatically. When deleting the directories you have to be careful not to delete the directory of a running TaskManager. Cheers, Till On Wed, Feb 27, 2019 at 6:29 PM Kumar Bolar, Harshith wrote: > Thanks Till, > > > > It appears to occur when a task mana

Setting source vs sink vs window parallelism with data increase

2019-02-28 Thread Padarn Wilson
Hi all, I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like: Data: (key1, count, time) Source -> Map(x -> (x, newKeyList(x.key1)) -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time)) -> Keyby(_

Re: Collapsing watermarks after keyby

2019-02-28 Thread Padarn Wilson
I created a small test to see if I could replicate this... but I couldn't :-) Below is my code that provides a counter example. It is not very clean, but perhaps it is useful for someone else in the future: class SessionWindowTest extends FunSuite with Matchers { test("Should advance watermark

Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread LINZ, Arnaud
Hello, I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”. Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a whil

Re: Breakage in Flink CLI in 1.5.0

2019-02-28 Thread sen
Hi Till: So how can we get the right rest address and port when using HA mode on Yarn? I get it from the rest api "/jars ". But when I submit a job use the flink run -m ,it failed . org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the

Two Kubernetes clusters and one Flink cluster?

2019-02-28 Thread Thomas Eckestad
Hi, I'm working with two separate Kubernetes clusters located in different regions (hosted in proprietary data centers), the distance between the regions introduces a pretty high (~50ms) latency between the clusters, so communication should not go cross-site unless necessary. I would like to us

Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul, I am not sure whether task thread is involverd in some works during snapshoting states for FsStateBackend. But I have another experience which might also cause your problem. From your descriptions below, the last task is blocked by `SingleInputGate.getNextBufferOrEvent` that means the