Running Flink on kerberized HDP 3.1 (minimal getting started)

2020-06-12 Thread Georg Heiler
Hi, I try to run Flink on a kerberized HDP 3.1 instance and need some help getting started. https://stackoverflow.com/questions/62330689/execute-flink-1-10-on-a-hdp-3-1-cluster-to-access-hive-tables describes how far I have gotten so far. In the end, I want to be able to start task managers on YA

Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

2020-06-12 Thread Yun Gao
Hi Felipe, I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. Could you also share your current executing code and the full stacktrace of the exception ? Best, Yun [1] https://github.com/ververica/flink-tr

Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Xintong Song
Yes, that is correct. 'taskmanager.memory.process.size' is the most recommended. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 10:59 PM Clay Teeter wrote: > Ok, this is great to know. So in my case; I have a k8 pod that has a > limit of 4Gb. I should remove the -Xmx and add one of these

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arvid Heise
Hi Arti, ouch 3M is pretty far off the current setting. Flink aside, you need to use 100 machines at the very minimum with the current approach (AsyncHTTP and your evaluated machine). That's probably a point where I'd try other libraries first and most importantly I'd evaluate different machines.

Re: Restore from savepoint through Java API

2020-06-12 Thread David Anderson
You can study LocalStreamingFileSinkTest [1] for an example of how to approach this. You can use the test harnesses [2], keeping in mind that - initializeState is called during instance creation - the provided context indicates if state is being restored from a snapshot - snapshot is called when t

How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?

2020-06-12 Thread Felipe Gutierrez
Hi, I am using the flink training exercise TaxiRide [1] to execute a stream count of events. On the cluster and on my local machine I am receiving the message that joda.Time cannot be serialized "class org.joda.time.LocalDateTime is not a valid POJO type". However it is starting the job on the clu

Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Clay Teeter
Ok, this is great to know. So in my case; I have a k8 pod that has a limit of 4Gb. I should remove the -Xmx and add one of these -D parameters. * taskmanager.memory.flink.size * *taskmanager.memory.process.size. <- Probably this one* * taskmanager.memory.task.heap.size and taskmanager.memory.m

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arti Pande
Hi Arvid, *Shared api client*: Actually in the flow of writing I missed to mention that we switched to a static shared instance of async http client for all 7 subtasks of the AsyncIO. The number of threads therefore is not 140 (20 * 7) but just (16 + 8 or 16 = 24 or 32) which includes a static sha

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arvid Heise
Hi Arti, Thank you very much for providing so much information. One additional test that you could do is to check how the pipeline performs by mocking the actual HTTP request and directly return a static response through Async IO. This would give you an exact number including potential serializat

Re: Automatically resuming failed jobs in K8s

2020-06-12 Thread Averell
Thank you very much, Yang. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Xintong Song
> > Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e > 29GB will be used. > This is true. So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am > assuming the heap max of 102Gb will be used in the N/w mem calculation. > Is that the right way to set env.java.opt

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Vijay Balakrishnan
Hi Xintong, Just to be clear. I haven't set any -Xmx -i will check our scripts again. Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB will be used. So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am assuming the heap max of 102Gb will be used in the N/w

Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Xintong Song
I would suggest not to set -Xmx. Flink will always calculate the JVM heap size from the configuration and set a proper -Xmx. If you manually set -Xmx that overwrites the one Flink calculated, it might result in unpredictable behaviors. Please refer to this document[1]. In short, you could levera

Re: Flink Async IO operator tuning / micro-benchmarks

2020-06-12 Thread Arti Pande
Hi Arvid, Thanks for quick reply and totally agree with you on the differences between microbenchmarks and a full benchmark with specific use-case. Thanks for sending the microbenchmark screenshot. For our use-case, the streaming pipeline has five main transformations that have business logic, of

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Xintong Song
Flink should have calculated the heap size and set the -Xms, according to the equations I mentioned. So if you haven't set an customized -Xmx that overwrites this, it should not use the default 1.4 of physical memory. > > >- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - >

Re: The network memory min (64 mb) and max (1 gb) mismatch

2020-06-12 Thread Clay Teeter
Thank you Xintong, while tracking down the existence of bash-java-utils.jar I found a bug in my CI scripts that incorrectly built the wrong version of flink. I fixed this and then added a -Xmx value. env: - name: FLINK_ENV_JAVA_OPTS value: "-Xmx{{ .Values.analytics.flink.tas

Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard

2020-06-12 Thread Vijay Balakrishnan
Thx, Xintong for a great answer. Much appreciated. https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap Max heap: if -Xmx is set then it is its value else ΒΌ of physical machine memory estimated by the JVM No -Xmx is set.So, 1/4 of 102GB = 25.5GB but not sure a

Re: Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Xintong Song
To be more specific, your 1400m total memory should also consists of: - 572MB heap memory (-Xmx & -Xms) - 268MB direct memory (-XX:MaxDirectMemorySize) - 560MB managed memory (native memory, calculated as 1400m * managedMemoryFraction, the fraction is by default 0.4) Thank you~ Xint

Re: Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Xintong Song
Hi Li, FLINK_TM_HEAP corresponds to the legacy configuration option "taskmanager.heap.size". It is supported for backwards compatibility. I strongly recommend you to use "taskmanager.memory.flink.size" or "taskmanager.memory.process.size" instead, which can be passed either in "flink-conf.yaml" or

Flink 1.10.1 not using FLINK_TM_HEAP for TaskManager JVM Heap size correctly?

2020-06-12 Thread Li Peng
Hey folks, we recently migrated from Flink 1.9.x to 1.10.1, and we noticed some wonky behavior in how JVM is configured: 1. We Add FLINK_JM_HEAP=5000m and FLINK_TM_HEAP=1400m variables to the environment 2. The JobManager allocates the right heap size as expected 3. However, the TaskManager (start