[
https://issues.apache.org/jira/browse/HUDI-3661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641823#comment-17641823
]
lei w commented on HUDI-3661:
-----------------------------
hi,[~hd zhou] , [~danny0405]. In our cluster, we removed the async executor to
solve this problem , but this lead checkpoint timeout. I have an idea to avoid
this problem.
Some details are as follows:
1. compactionPlanOperator will always send CompactionPlanEvent to down stream
in method named notifyCheckpointComplete.
If no compaction plan, we will send compactionPlanEvent with null
compactionOperation and broadcast this event to down stream by
sending more events than compactTask concurrency.
2. Add a CompactionCommitEventList(a thread safe list) in compactFunction.
3. When compactionTask receives events from upstream:
3.1: If compactionPlanEvent with null compactionOperation. do 3.3.
3.2: If compactionPlanEvent with non-null compactionOperation, put this
task to AsyncCompactionExecutor. do 3.3.
3.3: Poll all CompactionCommitEvent from CompactionCommitEventList and
send to downStream.
4. When the compaction is done, AsyncCompactionExecutor will put
CompactionCommitEvent to CompactionCommitEventList. Wait until the next
CompactionPlanEvent arrives, CompactionCommitEvent will send to down
stream.
This idea will lead to delayed submission of compaction, but will not cause
checkpoint timeout. Well, what do you suggest? Your prompt reply would be
greatly appreciated.
> Flink async compaction is not thread safe when use watermark
> ------------------------------------------------------------
>
> Key: HUDI-3661
> URL: https://issues.apache.org/jira/browse/HUDI-3661
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: hd zhou
> Priority: Major
> Attachments: image-2022-03-18-19-38-39-257.png
>
>
> async compaction will start a executor async compaciton and send compaction
> result message to next flink operator. But collector.collect() is not a
> threadsafe function. when use watermark or latencyMarker, they both call
> collector.collect() may cause issue.
> we should not let async compaction = false
>
> !image-2022-03-18-19-38-39-257.png!
>
>
> !https://git.bilibili.co/datacenter/bili-hudi/uploads/79608d01b0301de84d1d9e3cf24f1d21/image.png!
>
> !https://git.bilibili.co/datacenter/bili-hudi/uploads/e9c2f27d395e708a407bcf40f672c870/image.png!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)