[ 
https://issues.apache.org/jira/browse/FLINK-24539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430022#comment-17430022
 ] 

Piotr Nowojski commented on FLINK-24539:
----------------------------------------

Hi [~vmaster]. In the future I recommend first asking on [the user mailing 
list|https://flink.apache.org/community.html#mailing-lists], as this is the 
fastest way how you can receive a help.

I can't speak if it's normal for {{ChangelogNormalize}} to take such long time 
to initialize (I presume it's busy recovering it's state, unless you are using 
unaligned checkpoints). Assuming this is normal, I can't see anything wrong in 
this particular task manager log file. There was no error here, something else 
caused the job to fail/fail over and which lead to cancellation of tasks 
running on this task manager. Maybe it has been an error on another task 
manager, or some error on the job manager itself (too many consecutive 
checkpoint failures?). It would help if you could take a look in the job 
manager log looking for the root cause of the failure.

> ChangelogNormalize operator tooks too long time to INITIALIZING until failed
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-24539
>                 URL: https://issues.apache.org/jira/browse/FLINK-24539
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / Task
>    Affects Versions: 1.13.1
>         Environment: Flink version :1.13.1
> TaskManager memory:
> !image-2021-10-14-13-36-56-899.png|width=578,height=318!
> JobManager memory:
> !image-2021-10-14-13-37-51-445.png|width=578,height=229!
>            Reporter: vmaster.cc
>            Priority: Major
>         Attachments: image-2021-10-14-13-19-08-215.png, 
> image-2021-10-14-13-36-56-899.png, image-2021-10-14-13-37-51-445.png, 
> image-2021-10-14-14-13-13-370.png, image-2021-10-14-14-15-40-101.png, 
> image-2021-10-14-14-16-33-080.png, 
> taskmanager_container_e11_1631768043929_0012_01_000004_log.txt
>
>
> I'm using debezium to produce cdc from mysql, considering its at least one 
> delivery, so i must set the config 
> 'table.exec.source.cdc-events-duplicate=true'.
> But when some unknown case make my task down, flink task restart  failed 
> always. I found that ChangelogNormalize operator tooks too long time in 
> INITIALIZING stage.
>  
> screenshot and log fragment are as follows:
> !image-2021-10-14-13-19-08-215.png|width=567,height=293!
>  
> {code:java}
> 2021-10-14 12:32:33,660 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
> Finished building RocksDB keyed state-backend at 
> /data3/yarn/nm/usercache/flink/appcache/application_1631768043929_0012/flink-io-f31735c3-e726-4c49-89a5-916670809b7a/job_7734977994a6a10f7cc784d50e4a1a34_op_KeyedProcessOperator_dc2290bb6f8f5cd2bd425368843494fe__1_1__uuid_6cbbe6ae-f43e-4d2a-b1fb-f0cb71f257af.2021-10-14
>  12:32:33,662 INFO  org.apache.flink.runtime.taskmanager.Task                 
>    [] - GroupAggregate(groupBy=[teacher_id, create_day], select=[teacher_id, 
> create_day, SUM_RETRACT($f2) AS teacher_courseware_count]) -> 
> Calc(select=[teacher_id, create_day, CAST(teacher_courseware_count) AS 
> teacher_courseware_count]) -> NotNullEnforcer(fields=[teacher_id, 
> create_day]) (1/1)#143 (9cca3ef1293cc6364698381bbda93998) switched from 
> INITIALIZING to RUNNING.2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Ignoring 
> checkpoint aborted notification for non-running task 
> ChangelogNormalize(key=[c_id]) -> Calc(select=[c_author_id AS teacher_id, 
> DATE_FORMAT(c_create_time, _UTF-16LE'yyyy-MM-dd') AS create_day, IF((c_state 
> = 10), 1, 0) AS $f2], where=[((c_is_group = 0) AND (c_author_id <> 
> _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) 
> (1/1)#143.2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
> (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
> (cc25f9ae49c4db01ab40ff103fae43fd) switched from RUNNING to 
> CANCELING.2021-10-14 12:38:07,581 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Triggering 
> cancellation of task code Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 
> (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,583 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Attempting 
> to cancel task Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 
> (5419f41a3f0cc6c2f3f4c82c87f4ae22).2021-10-14 12:38:07,583 INFO  
> org.apache.flink.runtime.taskmanager.Task                    [] - Sink: 
> Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count],
>  fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 
> (5419f41a3f0cc6c2f3f4c82c87f4ae22) switched from RUNNING to CANCELING.
> {code}
>  
> attention:
> 1、The table has a large amount of data, up to 500 million. 
> 2、Because the amount of data is very large, the rocksdb state backend is used
> 3、More other env infos ,see next section and the full log see attachment.
> {code:java}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to