Hi mingliang,

Considering your first question. I answered it on stack overflow[1].
Hope it helps.

Best, Hequn

[1]
https://stackoverflow.com/questions/51832577/what-may-probably-cause-large-alignment-duration-for-flink-job


On Tue, Aug 14, 2018 at 10:16 AM, 祁明良 <m...@xiaohongshu.com> wrote:

> Thank you for this great answer, Fabian.
>
> Regarding the yarn JVM heap size, I tried to change
> containerized.heap*-*cutoff*-**ratio**:* *0.25*
> And it somehow looks like working, but the actually memory needed for
> rocksdb still looks like a blackbox  to me. I see there’s already a JIRA
> ticket talking about this problem[1], created last year and still open yet.
> What I can do is just keep enlarging this value until YARN don’t kill my
> TaskManager because of memory usage:)
>
> By the way, my rough calculation of rocksdb memory on each TM is like
> num of slots per task * num of stateful operators(including source and
> sink?) * (block cache size + write buffer size)
>
> I bet it’s not correct..
>
> Best,
> Mingliang
>
> [1] https://issues.apache.org/jira/browse/FLINK-7289
>
> On 13 Aug 2018, at 11:05 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Mingliang,
>
> let me answer your second question first:
>
> > Another question is about the alignment buffer, I thought it was only
> used for multiple input stream cases. But for keyed process function , what
> is actually aligned?
>
> When a task sends records to multiple downstream tasks (task not
> operators!) due to a broadcast or partition/keyBy/shuffle connection, the
> task broadcasts each checkpoint barrier to all of its receiving tasks.
> Therefore, each task that receives records from multiple tasks will
> receive multiple checkpoint barriers. (Checkpoint barriers behave similar
> to watermarks in this regard)
> In order to provide exactly-once state consistency, a task must buffer
> records from input connection that forwarded a barrier until barriers from
> all input connections have been received and the state checkpoint was
> initiated.
>
> What does this mean for the long checkpoint alignment that you observe?
> Checkpoint alignment starts when the first barrier is received and ends
> when the last barrier is received.
> Hence, it seems as if one task manager receives some barrier(s) later than
> the other nodes, probably because it is more heavily loaded.
> The fact that all affected tasks run on the same TM and that you mentioned
> backpressure is a hint for that because TMs multiplex the connection of all
> tasks.
>
> Regarding the memory configuration question, I am not sure if there is a
> way to override the JVM heap size on YARN. Maybe others can answer this
> question.
>
> Best,
> Fabian
>
> 2018-08-12 18:36 GMT+02:00 祁明良 <m...@xiaohongshu.com>:
>
>> Hi all,
>>
>> I have several questions regarding the checkpoint. The background is I'm
>> using a ProcessFunction keyed by user_id somehow works like following:
>>
>> inputStream
>>   .keyBy(x => getUserKey(x))
>>   .process(...)
>>
>> It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint 
>> metrics, only a small number of subtasks have a large "alignment 
>> buffered/duration", and looks like either all the 2 slots on the same TM are 
>> both high or both low.  What may probably cause this?
>>
>>
>>    1. maybe data skew, but I see the amount of data is almost same
>>    2. or network?
>>    3. The system is under back pressure, but I don't understand why only 
>> like 4 out of 80 subtasks perform like this.
>>
>> Another question is about the alignment buffer, I thought it was only used 
>> for multiple input stream cases. But for keyed process function , what is 
>> actually aligned?
>>
>> The last question is about tuning rocksdb, I try to assign some memory to 
>> writebuffer and block cache, and the doc says "typically by decreasing the 
>> JVM heap size of the TaskManagers by the same amount" , and taskmanager heap 
>> size is "On YARN setups, this value is automatically configured to the size 
>> of the TaskManager's YARN container, minus a certain tolerance value." This 
>> looks like I should decrease the taskmanager heap and the value is set by 
>> YARN automatically, so what should I do?
>>
>> Best,
>>
>> Mingliang
>>
>>
>>
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部
>> 分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>> This communication may contain privileged or other confidential
>> information of Red. If you have received it in error, please advise the
>> sender by reply e-mail and immediately delete the message and any
>> attachments without copying or disclosing the contents. Thank you.
>>
>>
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(
> 包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
> This communication may contain privileged or other confidential
> information of Red. If you have received it in error, please advise the
> sender by reply e-mail and immediately delete the message and any
> attachments without copying or disclosing the contents. Thank you.
>
>

Reply via email to