mbutrovich commented on code in PR #2850:
URL: https://github.com/apache/datafusion-comet/pull/2850#discussion_r2590553516
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -217,22 +217,14 @@ abstract class CometNativeExec extends CometExec {
// TODO: support native metrics for all operators.
val nativeMetrics = CometMetricNode.fromCometPlan(this)
+ // Go over all all the native scans, in order to see if they need
encryption options.
Review Comment:
typo: "all all"
##########
spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala:
##########
@@ -394,53 +395,60 @@ class ParquetEncryptionITCase extends CometTestBase with
SQLTestUtils {
val parquetDF2 = spark.read.parquet(parquetDir2)
val unionDF = parquetDF1.union(parquetDF2)
+ // Since the union has its own executeColumnar, problems would not
surface if it is the last operator
+ // If we add another comet aggregate after the union, we see the need
for the
+ // foreachUntilCometInput() in operator.scala
+ // as we would error on multiple native scan execs despite no longer
being in the same plan at all
+ val aggDf = unionDF.agg(functions.sum("id"))
if (CometConf.COMET_ENABLED.get(conf)) {
- checkSparkAnswerAndOperator(unionDF)
+ checkSparkAnswerAndOperator(aggDf)
} else {
- checkSparkAnswer(unionDF)
+ checkSparkAnswer(aggDf)
}
}
}
- }
-
- test("Test different key lengths") {
- import testImplicits._
-
- withTempDir { dir =>
- withSQLConf(
- DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME ->
cryptoFactoryClass,
- KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
- "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
- KeyToolkit.DATA_KEY_LENGTH_PROPERTY_NAME -> "256",
- KeyToolkit.KEK_LENGTH_PROPERTY_NAME -> "256",
- InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
- s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
- val inputDF = spark
- .range(0, 1000)
- .map(i => (i, i.toString, i.toFloat))
- .repartition(5)
- .toDF("a", "b", "c")
- val parquetDir = new File(dir, "parquet").getCanonicalPath
- inputDF.write
- .option(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME,
"key1: a, b; key2: c")
- .option(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME,
"footerKey")
- .parquet(parquetDir)
-
- verifyParquetEncrypted(parquetDir)
-
- val parquetDF = spark.read.parquet(parquetDir)
- assert(parquetDF.inputFiles.nonEmpty)
- val readDataset = parquetDF.select("a", "b", "c")
-
- // native_datafusion and native_iceberg_compat fall back due to
Arrow-rs not
- // supporting other key lengths
- if (CometConf.COMET_ENABLED.get(conf) &&
CometConf.COMET_NATIVE_SCAN_IMPL.get(
- conf) == SCAN_NATIVE_COMET) {
- checkSparkAnswerAndOperator(readDataset)
- } else {
- checkAnswer(readDataset, inputDF)
+ test("Test different key lengths") {
Review Comment:
Did a formatting change cause the diff for "Test different key lengths"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]