Hi, You can try to filter NULL values with an explicit condition like "xxxx is not NULL".
Best, Kurt On Sat, Jan 11, 2020 at 4:10 AM Eva Eva <eternalsunshine2...@gmail.com> wrote: > Thank you both for the suggestions. > I did a bit more analysis using UI and identified at least one > problem that's occurring with the job rn. Going to fix it first and then > take it from there. > > *Problem that I identified:* > I'm running with 26 parallelism. For the checkpoints that are expiring, > one of a JOIN operation is finishing at 25/26 (96%) progress with > corresponding SubTask:21 has "n/a" value. For the same operation I also > noticed that the load is distributed poorly with heavy load being fed to > SubTask:21. > My guess is bunch of null values are happening for this JOIN operation and > being put into the same task. > Currently I'm using SQL query which gives me limited control on handling > null values so I'll try to programmatically JOIN and see if I can avoid > JOIN operation whenever the joining value is null. This should help with > better load distribution across subtasks. And may also fix expiring > checkpointing issue. > > Thanks for the guidance. > Eva. > > On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu <qcx978132...@gmail.com> > wrote: > >> Hi >> >> For expired checkpoint, you can find something like " Checkpoint xxx of >> job xx expired before completing" in jobmanager.log, then you can go to the >> checkpoint UI to find which tasks did not ack, and go to these tasks to see >> what happened. >> >> If checkpoint was been declined, you can find something like "Decline >> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this >> case, you can go to the task directly to find out why the checkpoint failed. >> >> Best, >> Congxian >> >> >> Yun Tang <myas...@live.com> 于2020年1月10日周五 下午7:31写道: >> >>> Hi Eva >>> >>> If checkpoint failed, please view the web UI or jobmanager log to see >>> why checkpoint failed, might be declined by some specific task. >>> >>> If checkpoint expired, you can also access the web UI to see which tasks >>> did not respond in time, some hot task might not be able to respond in >>> time. Generally speaking, checkpoint expired is mostly caused by back >>> pressure which led the checkpoint barrier did not arrive in time. Resolve >>> the back pressure could help the checkpoint finished before timeout. >>> >>> I think the doc of monitoring web UI for checkpoint [1] and back >>> pressure [2] could help you. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Eva Eva <eternalsunshine2...@gmail.com> >>> *Sent:* Friday, January 10, 2020 10:29 >>> *To:* user <user@flink.apache.org> >>> *Subject:* Please suggest helpful tools >>> >>> Hi, >>> >>> I'm running Flink job on 1.9 version with blink planner. >>> >>> My checkpoints are timing out intermittently, but as state grows they >>> are timing out more and more often eventually killing the job. >>> >>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this >>> one is accumulated due to prior failed ones), Average=8.44GB. >>> >>> Although size is huge, I have enough space on EC2 instance in which I'm >>> running job. I'm using RocksDB for checkpointing. >>> >>> *Logs does not have any useful information to understand why checkpoints >>> are expiring/failing, can someone please point me to tools that can be used >>> to investigate and understand why checkpoints are failing.* >>> >>> Also any other related suggestions are welcome. >>> >>> >>> Thanks, >>> Reva. >>> >>