[ https://issues.apache.org/jira/browse/FLINK-24539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430063#comment-17430063 ]
Timo Walther commented on FLINK-24539: -------------------------------------- [~vmaster] {{ChangelogNormalize}} stores the entire input table in state in order normalize the incoming change messages. It is a very expensive operator. Are all 500 million rows processed by a single node or how is your data partitioned? > ChangelogNormalize operator tooks too long time to INITIALIZING until failed > ---------------------------------------------------------------------------- > > Key: FLINK-24539 > URL: https://issues.apache.org/jira/browse/FLINK-24539 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Task, Table SQL / > Runtime > Affects Versions: 1.13.1 > Environment: Flink version :1.13.1 > TaskManager memory: > !image-2021-10-14-13-36-56-899.png|width=578,height=318! > JobManager memory: > !image-2021-10-14-13-37-51-445.png|width=578,height=229! > Reporter: vmaster.cc > Priority: Major > Attachments: image-2021-10-14-13-19-08-215.png, > image-2021-10-14-13-36-56-899.png, image-2021-10-14-13-37-51-445.png, > image-2021-10-14-14-13-13-370.png, image-2021-10-14-14-15-40-101.png, > image-2021-10-14-14-16-33-080.png, > taskmanager_container_e11_1631768043929_0012_01_000004_log.txt > > > I'm using debezium to produce cdc from mysql, considering its at least one > delivery, so i must set the config > 'table.exec.source.cdc-events-duplicate=true'. > But when some unknown case make my task down, flink task restart failed > always. I found that ChangelogNormalize operator tooks too long time in > INITIALIZING stage. > > screenshot and log fragment are as follows: > !image-2021-10-14-13-19-08-215.png|width=567,height=293! > > {code:java} > 2021-10-14 12:32:33,660 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - > Finished building RocksDB keyed state-backend at > /data3/yarn/nm/usercache/flink/appcache/application_1631768043929_0012/flink-io-f31735c3-e726-4c49-89a5-916670809b7a/job_7734977994a6a10f7cc784d50e4a1a34_op_KeyedProcessOperator_dc2290bb6f8f5cd2bd425368843494fe__1_1__uuid_6cbbe6ae-f43e-4d2a-b1fb-f0cb71f257af.2021-10-14 > 12:32:33,662 INFO org.apache.flink.runtime.taskmanager.Task > [] - GroupAggregate(groupBy=[teacher_id, create_day], select=[teacher_id, > create_day, SUM_RETRACT($f2) AS teacher_courseware_count]) -> > Calc(select=[teacher_id, create_day, CAST(teacher_courseware_count) AS > teacher_courseware_count]) -> NotNullEnforcer(fields=[teacher_id, > create_day]) (1/1)#143 (9cca3ef1293cc6364698381bbda93998) switched from > INITIALIZING to RUNNING.2021-10-14 12:38:07,581 INFO > org.apache.flink.runtime.taskmanager.Task [] - Ignoring > checkpoint aborted notification for non-running task > ChangelogNormalize(key=[c_id]) -> Calc(select=[c_author_id AS teacher_id, > DATE_FORMAT(c_create_time, _UTF-16LE'yyyy-MM-dd') AS create_day, IF((c_state > = 10), 1, 0) AS $f2], where=[((c_is_group = 0) AND (c_author_id <> > _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) > (1/1)#143.2021-10-14 12:38:07,581 INFO > org.apache.flink.runtime.taskmanager.Task [] - Attempting > to cancel task Sink: > Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], > fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 > (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,581 INFO > org.apache.flink.runtime.taskmanager.Task [] - Sink: > Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], > fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 > (cc25f9ae49c4db01ab40ff103fae43fd) switched from RUNNING to > CANCELING.2021-10-14 12:38:07,581 INFO > org.apache.flink.runtime.taskmanager.Task [] - Triggering > cancellation of task code Sink: > Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], > fields=[teacher_id, create_day, teacher_courseware_count]) (2/2)#143 > (cc25f9ae49c4db01ab40ff103fae43fd).2021-10-14 12:38:07,583 INFO > org.apache.flink.runtime.taskmanager.Task [] - Attempting > to cancel task Sink: > Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], > fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 > (5419f41a3f0cc6c2f3f4c82c87f4ae22).2021-10-14 12:38:07,583 INFO > org.apache.flink.runtime.taskmanager.Task [] - Sink: > Sink(table=[default_catalog.default_database.t_flink_school_teacher_courseware_count], > fields=[teacher_id, create_day, teacher_courseware_count]) (1/2)#143 > (5419f41a3f0cc6c2f3f4c82c87f4ae22) switched from RUNNING to CANCELING. > {code} > > attention: > 1、The table has a large amount of data, up to 500 million. > 2、Because the amount of data is very large, the rocksdb state backend is used > 3、More other env infos ,see next section and the full log see attachment. > {code:java} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)