ehurheap commented on issue #6194:
URL: https://github.com/apache/hudi/issues/6194#issuecomment-1213323341
sorry for the confusion @nsivabalan. I reviewed the commands that you
specified to verify they were the same as what I tried. The main differences
between what you did and our situation:
- Our hudi table was loaded by 2 separate processes, one bulk_insert, one
streaming ingest job
- In addition our hudi table is MOR.
I can run a spark query similar to yours and verify there are duplicates in
the given partition.
```
val
dupePath="s3://heap-datalake-storage/data/tables/events/env_id=123456789/week=20220711"
val ddF = spark.read.parquet(dupePath)
ddF.createOrReplaceTempView("hoh")
val dupQ = s"""
|select _hoodie_partition_path, _hoodie_record_key, count(*)
| from hoh group by _hoodie_partition_path, _hoodie_record_key
| order by count(*) desc
|""".stripMargin
spark.sql(dupQ).show(false)
```
sample output:
```
+------------------------------+-------------------------------------------------------------------+--------+
|_hoodie_partition_path |_hoodie_record_key
|count(1)|
+------------------------------+-------------------------------------------------------------------+--------+
|env_id=123456789/week=20220711|env_id:123456789,user_id:7806358957060773,event_id:5758152328327473|3
|
|env_id=123456789/week=20220711|env_id:123456789,user_id:5403332495352077,event_id:3536309858058402|2
|
|env_id=123456789/week=20220711|env_id:123456789,user_id:4713648045477470,event_id:8717656941318904|2
|
|env_id=123456789/week=20220711|env_id:123456789,user_id:2025910439767252,event_id:8549159234261693|2
|
|env_id=123456789/week=20220711|env_id:123456789,user_id:7507696929673571,event_id:3179806702204642|2
|
|env_id=123456789/week=20220711|env_id:123456789,user_id:7301684312119961,event_id:717438862368076
|2 |
```
But when I run this hudi-cli command:
`hudi:events->repair deduplicate --duplicatedPartitionPath
"env_id=123/week=20220711" --repairedOutputPath /tmp/hhdeduplicates
--sparkMaster local[2] --sparkMemory 4G --dryrun true --dedupeType
"upsert_type"`
The output I get is:
```
22/08/12 16:27:21 ERROR SparkMain: Fail to execute commandString
org.apache.spark.sql.AnalysisException: cannot resolve '_hoodie_record_key'
given input columns: []; line 5 pos 15;
'UnresolvedHaving ('dupe_cnt > 1)
+- 'Aggregate ['_hoodie_record_key], ['_hoodie_record_key AS dupe_key#0,
count(1) AS dupe_cnt#1L]
+- SubqueryAlias htbl_1660321638341
+- View (`htbl_1660321638341`, [])
+- LocalRelation <empty>
hudi:events->
hudi:eveat
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
hudi:eveat
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
hudi:eveat
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
hudi:eveat
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:573)
hudi:eveat
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
hudi:eveat
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:573)
hudi:eveat
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
hudi:eveat
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
hudi:eveat
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
hudi:eveat
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
hudi:eveat
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
hudi:eveat
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209)
hudi:eveat
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
...
Deduplication failed!
```
There are no files in the `--repairedOutputPath ` location. I understand
that if there were data there, we would use that deduplicated data to replace
what is currently in hudi: first deleting the duplicates from hudi, then load
the deduped data from the repairedOutputPath location.
But since we have no repaired deduplicated data, we are stuck. Does that
make sense? Have I missed something ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]