Re: Running Flink on Yarn

2019-01-02 Thread Anil
Hi Andrey. Thanks for the reply. Apologies about the late follow up, I was out of office. Suppose I have 3 TM and each has 3 task slot and each kafka stream has 9 partitions each. Each thread will consumer from stream 1 (a1) and stream 2 (a2). Considering the query, data will need to be buffered

Re: Running Flink on Yarn

2018-12-24 Thread Andrey Zagrebin
I think the data buffered for join will be distributed among threads by order_id (a1 and a2 will be internally keyed). Each thread will have non-shared window state (for 2 hours) per certain order_id's. Slots will share some common JVM resources mentioned in docs, also access to state DB but not th

Re: Running Flink on Yarn

2018-12-24 Thread Anil
I am using time-windowed join only. Here's a sample query - SELECT a1.order_id, a2.order.restaurant_id FROM awz_s3_stream1 a1 INNER JOIN awz_s3_stream2 a2 ON CAST(a1.order_id AS VARCHAR) = a2.order_id AND a1.to_state = 'PLACED' AND a1.proctime BETWEEN a2.proctime - INTERVAL '2' HOUR AND a2.proct

Re: Running Flink on Yarn

2018-12-24 Thread Andrey Zagrebin
If you mean time-windowed join documented here [1]. I think it implicitly uses keyed stream [2] where the key is the field in equi-join predicate. The window state is also keyed [3] in this case. I also cc Timo and Piotr, they might add more to this topic. [1] https://ci.apache.org/projects/flink/

Re: Running Flink on Yarn

2018-12-24 Thread Anil
Thanks for the quick response Andrey. I'm doing a SQL time-windowed join on non-keyed stream. So all the thread in various task slot in the same TM will share this state. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Running Flink on Yarn

2018-12-24 Thread Andrey Zagrebin
Hi, I suppose you apply windowing to a keyed stream or SQL time-windowed join? Globally windowed streams are non-parallel and processed/stored in one slot. In case of keyed stream, total range of key values is distributed among slots. Each slot processes/stores only a subrange of keys. Window st

Running Flink on Yarn

2018-12-23 Thread Anil
I have a setup for Flink(1.4.2) with YARN. I'm using Flink Yarn Client for deploying my jobs to Yarn Cluster. In the current setup parallelism was directly mapped to the number of cores, with each parallel instance of the job running in one container. So for a parallelism of 9, there are 10 cont

Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-25 Thread Till Rohrmann
Hi Henry, since version 1.5 you don't need to specify the number of TaskManagers to start, because the system will figure this out. Moreover, in version 1.5.x and 1.6.x it is recommended to set the number of slots per TaskManager to 1 since we did not support multi task slot TaskManagers properly.

Re: Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread vino yang
Hi Henry, The phenomenon you expressed is there, this is a bug, but I can't remember its JIRA number. Thanks, vino. 徐涛 于2018年10月24日周三 下午11:27写道: > Hi experts > I am running flink job on YARN in job cluster mode, the job is divided > into 2 tasks, the following are some configs of the job: > pa

Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread 徐涛
Hi experts I am running flink job on YARN in job cluster mode, the job is divided into 2 tasks, the following are some configs of the job: parallelism.default => 16 taskmanager.numberOfTaskSlots => 8 -yn => 2 when the program starts, I found that the count

Task manager count goes the expand then converge process when running flink on YARN

2018-10-24 Thread 徐涛
Hi experts I am running flink job on YARN in job cluster mode, the job is divided into 2 tasks, the following are some configs of the job: parallelism.default => 16 taskmanager.numberOfTaskSlots => 8 -yn => 2 when the program starts, I found that the count

Running flink on YARN

2017-10-12 Thread Navneeth Krishnan
Hello, I'm running flink on AWS EMR and I would like to know how I can pass a custom log4j properties file. I changed the log4j.properties file in flink conf directory but it doesn't seem like the changes are reflected. Thanks. I'm using the below command to start my flink job. > flink run -m yar