Hi mingliang, Considering your first question. I answered it on stack overflow[1]. Hope it helps.
Best, Hequn [1] https://stackoverflow.com/questions/51832577/what-may-probably-cause-large-alignment-duration-for-flink-job On Tue, Aug 14, 2018 at 10:16 AM, 祁明良 <m...@xiaohongshu.com> wrote: > Thank you for this great answer, Fabian. > > Regarding the yarn JVM heap size, I tried to change > containerized.heap*-*cutoff*-**ratio**:* *0.25* > And it somehow looks like working, but the actually memory needed for > rocksdb still looks like a blackbox to me. I see there’s already a JIRA > ticket talking about this problem[1], created last year and still open yet. > What I can do is just keep enlarging this value until YARN don’t kill my > TaskManager because of memory usage:) > > By the way, my rough calculation of rocksdb memory on each TM is like > num of slots per task * num of stateful operators(including source and > sink?) * (block cache size + write buffer size) > > I bet it’s not correct.. > > Best, > Mingliang > > [1] https://issues.apache.org/jira/browse/FLINK-7289 > > On 13 Aug 2018, at 11:05 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Mingliang, > > let me answer your second question first: > > > Another question is about the alignment buffer, I thought it was only > used for multiple input stream cases. But for keyed process function , what > is actually aligned? > > When a task sends records to multiple downstream tasks (task not > operators!) due to a broadcast or partition/keyBy/shuffle connection, the > task broadcasts each checkpoint barrier to all of its receiving tasks. > Therefore, each task that receives records from multiple tasks will > receive multiple checkpoint barriers. (Checkpoint barriers behave similar > to watermarks in this regard) > In order to provide exactly-once state consistency, a task must buffer > records from input connection that forwarded a barrier until barriers from > all input connections have been received and the state checkpoint was > initiated. > > What does this mean for the long checkpoint alignment that you observe? > Checkpoint alignment starts when the first barrier is received and ends > when the last barrier is received. > Hence, it seems as if one task manager receives some barrier(s) later than > the other nodes, probably because it is more heavily loaded. > The fact that all affected tasks run on the same TM and that you mentioned > backpressure is a hint for that because TMs multiplex the connection of all > tasks. > > Regarding the memory configuration question, I am not sure if there is a > way to override the JVM heap size on YARN. Maybe others can answer this > question. > > Best, > Fabian > > 2018-08-12 18:36 GMT+02:00 祁明良 <m...@xiaohongshu.com>: > >> Hi all, >> >> I have several questions regarding the checkpoint. The background is I'm >> using a ProcessFunction keyed by user_id somehow works like following: >> >> inputStream >> .keyBy(x => getUserKey(x)) >> .process(...) >> >> It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint >> metrics, only a small number of subtasks have a large "alignment >> buffered/duration", and looks like either all the 2 slots on the same TM are >> both high or both low. What may probably cause this? >> >> >> 1. maybe data skew, but I see the amount of data is almost same >> 2. or network? >> 3. The system is under back pressure, but I don't understand why only >> like 4 out of 80 subtasks perform like this. >> >> Another question is about the alignment buffer, I thought it was only used >> for multiple input stream cases. But for keyed process function , what is >> actually aligned? >> >> The last question is about tuning rocksdb, I try to assign some memory to >> writebuffer and block cache, and the doc says "typically by decreasing the >> JVM heap size of the TaskManagers by the same amount" , and taskmanager heap >> size is "On YARN setups, this value is automatically configured to the size >> of the TaskManager's YARN container, minus a certain tolerance value." This >> looks like I should decrease the taskmanager heap and the value is set by >> YARN automatically, so what should I do? >> >> Best, >> >> Mingliang >> >> >> >> >> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部 >> 分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! >> This communication may contain privileged or other confidential >> information of Red. If you have received it in error, please advise the >> sender by reply e-mail and immediately delete the message and any >> attachments without copying or disclosing the contents. Thank you. >> >> > > > 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用( > 包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件! > This communication may contain privileged or other confidential > information of Red. If you have received it in error, please advise the > sender by reply e-mail and immediately delete the message and any > attachments without copying or disclosing the contents. Thank you. > >