Hi,

You can try to filter NULL values with an explicit condition like "xxxx is
not NULL".

Best,
Kurt


On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <eternalsunshine2...@gmail.com>
wrote:

> Thank you both for the suggestions.
> I did a bit more analysis using UI and identified at least one
> problem that's occurring with the job rn. Going to fix it first and then
> take it from there.
>
> *Problem that I identified:*
> I'm running with 26 parallelism. For the checkpoints that are expiring,
> one of a JOIN operation is finishing at 25/26 (96%) progress with
> corresponding SubTask:21 has "n/a" value. For the same operation I also
> noticed that the load is distributed poorly with heavy load being fed to
> SubTask:21.
> My guess is bunch of null values are happening for this JOIN operation and
> being put into the same task.
> Currently I'm using SQL query which gives me limited control on handling
> null values so I'll try to programmatically JOIN and see if I can avoid
> JOIN operation whenever the joining value is null. This should help with
> better load distribution across subtasks. And may also fix expiring
> checkpointing issue.
>
> Thanks for the guidance.
> Eva.
>
> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
>
>> Hi
>>
>> For expired checkpoint, you can find something like " Checkpoint xxx of
>> job xx expired before completing" in jobmanager.log, then you can go to the
>> checkpoint UI to find which tasks did not ack, and go to these tasks to see
>> what happened.
>>
>> If checkpoint was been declined, you can find something like "Decline
>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>> case, you can go to the task directly to find out why the checkpoint failed.
>>
>> Best,
>> Congxian
>>
>>
>> Yun Tang <myas...@live.com> 于2020年1月10日周五 下午7:31写道:
>>
>>> Hi Eva
>>>
>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>> why checkpoint failed, might be declined by some specific task.
>>>
>>> If checkpoint expired, you can also access the web UI to see which tasks
>>> did not respond in time, some hot task might not be able to respond in
>>> time. Generally speaking, checkpoint expired is mostly caused by back
>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>> the back pressure could help the checkpoint finished before timeout.
>>>
>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>> pressure [2] could help you.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Eva Eva <eternalsunshine2...@gmail.com>
>>> *Sent:* Friday, January 10, 2020 10:29
>>> *To:* user <user@flink.apache.org>
>>> *Subject:* Please suggest helpful tools
>>>
>>> Hi,
>>>
>>> I'm running Flink job on 1.9 version with blink planner.
>>>
>>> My checkpoints are timing out intermittently, but as state grows they
>>> are timing out more and more often eventually killing the job.
>>>
>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this
>>> one is accumulated due to prior failed ones), Average=8.44GB.
>>>
>>> Although size is huge, I have enough space on EC2 instance in which I'm
>>> running job. I'm using RocksDB for checkpointing.
>>>
>>> *Logs does not have any useful information to understand why checkpoints
>>> are expiring/failing, can someone please point me to tools that can be used
>>> to investigate and understand why checkpoints are failing.*
>>>
>>> Also any other related suggestions are welcome.
>>>
>>>
>>> Thanks,
>>> Reva.
>>>
>>

Reply via email to