maheshguptags opened a new issue, #12988:
URL: https://github.com/apache/hudi/issues/12988

   
   I am experiencing an issue when trying to delete records from a Hudi table 
where data is ingested using Flink streaming, and deletion is attempted using a 
Hudi batch processing job. Despite specifying a partition condition in the 
DELETE query, Hudi scans all partitions, which causes high resource usage and 
timeouts.
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   1. Continuously ingest the data using hudi-flink streaming job.
   2. Create hudi table with below config(another batch stream to delete the 
data from same table)
   ```
   CREATE TABLE IF NOT EXISTS hudi_temp(x STRING,_date STRING,_count 
BIGINT,type STRING,update_date TIMESTAMP(3)) PARTITIONED BY (`x`) WITH 
('connector' = 'hudi', 
'hoodie.datasource.write.recordkey.field'='x,_date','path' = 
'${bucket_path_daily}','table.type' = 
'COPY_ON_WRITE','hoodie.datasource.write.precombine.field'='updated_date','write.operation'
 = 
'delete','hoodie.datasource.write.partitionpath.field'='x','hoodie.write.concurrency.mode'='optimistic_concurrency_control','hoodie.write.lock.provider'='org.apache.hudi.client.transaction.lock.InProcessLockProvider','hoodie.cleaner.policy.failed.writes'='LAZY')");
   
   EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
   
   TableEnvironment tEnv = TableEnvironment.create(settings);
   
   tEnv.executeSql(createDeleteTableDDL);
   tEnv.executeSql("DELETE FROM daily_activity_summary where x 
='cl-278'").await();
   tEnv.executeSql("SELECT * FROM Orders where x='cl-278'").print();
   ``` 
   
   3. Deploy the Delete jobs 
   4. Followed below documents for doing same 
https://github.com/apache/flink/blob/release-1.20/docs/content/docs/dev/table/sql/delete.md
   
   **Expected behavior**
   
   Hudi should only scan the relevant partition (x = 'cl-278') when performing 
a DELETE operation, thereby reducing resource usage and preventing timeouts.
   it should delete the specific partition or specific conditions that are 
mentioned in step 1.
   
   **Environment Description**
   
   * Hudi version : 0.15.0
   
   * Spark version : NO
   * Flink version : 1.18.1
   * ENV : k8s
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : k8s
   
   
   **Additional context**
   
   I found there are multiple issue like.
   1. why is it scanning all the partition even if I am giving partition 
details and condition in query it self.
   2. I see there is pruner is getting called but still scanning all the data
   ```
   2025-03-03 10:47:33,191 INFO  org.apache.hudi.util.StreamerUtil  [] - Table 
option [hoodie.datasource.write.keygenerator.class] is reset to 
org.apache.hudi.keygen.ComplexAvroKeyGenerator because record key or partition 
path has two or more fields
   2025-03-03 10:47:36,293 INFO  org.apache.hudi.table.HoodieTableSource [] - 
Partition pruner for hoodie source, condition is:
   equals(client_id, 'cl-278')
   ```
   
   **Stacktrace**
   
   
   For single record in cl-278 it is taking 10 min and still it is not deleting 
and getting below exception
   ```
   Caused by: org.apache.hudi.exception.HoodieException: Timeout(601000ms) 
while waiting for instant initialize
        at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57)
        at 
org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:269)
        at 
org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:452)
        at 
org.apache.hudi.sink.StreamWriteFunction.endInput(StreamWriteFunction.java:157)
        at 
org.apache.hudi.sink.common.AbstractWriteOperator.endInput(AbstractWriteOperator.java:48)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Unknown Source)
   ```
   Failure deletion screenshot for reference
   
   
![Image](https://github.com/user-attachments/assets/065e73bf-5565-4223-8624-ca085e174449)
    
   cc : @ad1happy2go @cshuo @danny0405 @bhasudha 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to