mbutrovich commented on code in PR #2447:
URL: https://github.com/apache/datafusion-comet/pull/2447#discussion_r2408069317


##########
spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala:
##########
@@ -47,90 +50,399 @@ class ParquetEncryptionITCase extends CometTestBase with 
SQLTestUtils {
     encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
   private val key1 = 
encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8))
   private val key2 = 
encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8))
+  private val cryptoFactoryClass =
+    "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"
 
   test("SPARK-34990: Write and read an encrypted parquet") {
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
 
     import testImplicits._
 
-    
Seq("org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory").foreach 
{
-      factoryClass =>
-        withTempDir { dir =>
-          withSQLConf(
-            "parquet.crypto.factory.class" -> factoryClass,
-            "parquet.encryption.kms.client.class" ->
-              "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
-            "parquet.encryption.key.list" ->
-              s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
-
-            // Make sure encryption works with multiple Parquet files
-            val inputDF = spark
-              .range(0, 2000)
-              .map(i => (i, i.toString, i.toFloat))
-              .repartition(10)
-              .toDF("a", "b", "c")
-            val parquetDir = new File(dir, "parquet").getCanonicalPath
-            inputDF.write
-              .option("parquet.encryption.column.keys", "key1: a, b; key2: c")
-              .option("parquet.encryption.footer.key", "footerKey")
-              .parquet(parquetDir)
-
-            verifyParquetEncrypted(parquetDir)
-
-            val parquetDF = spark.read.parquet(parquetDir)
-            assert(parquetDF.inputFiles.nonEmpty)
-            val readDataset = parquetDF.select("a", "b", "c")
-
-            if (CometConf.COMET_ENABLED.get(conf)) {
-              checkSparkAnswerAndOperator(readDataset)
-            } else {
-              checkAnswer(readDataset, inputDF)
-            }
-          }
+    withTempDir { dir =>
+      withSQLConf(
+        DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
+        KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+        // Make sure encryption works with multiple Parquet files
+        val inputDF = spark
+          .range(0, 2000)
+          .map(i => (i, i.toString, i.toFloat))
+          .repartition(10)
+          .toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME, 
"key1: a, b; key2: c")
+          .option(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME, 
"footerKey")
+          .parquet(parquetDir)
+
+        verifyParquetEncrypted(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+
+        if (CometConf.COMET_ENABLED.get(conf)) {
+          checkSparkAnswerAndOperator(readDataset)
+        } else {
+          checkAnswer(readDataset, inputDF)
         }
+      }
     }
   }
 
   test("SPARK-37117: Can't read files in Parquet encryption external key 
material mode") {
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_DATAFUSION)
-    assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != 
CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
 
     import testImplicits._
 
-    
Seq("org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory").foreach 
{
-      factoryClass =>
-        withTempDir { dir =>
-          withSQLConf(
-            "parquet.crypto.factory.class" -> factoryClass,
-            "parquet.encryption.kms.client.class" ->
-              "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
-            "parquet.encryption.key.material.store.internally" -> "false",
-            "parquet.encryption.key.list" ->
-              s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
-
-            val inputDF = spark
-              .range(0, 2000)
-              .map(i => (i, i.toString, i.toFloat))
-              .repartition(10)
-              .toDF("a", "b", "c")
-            val parquetDir = new File(dir, "parquet").getCanonicalPath
-            inputDF.write
-              .option("parquet.encryption.column.keys", "key1: a, b; key2: c")
-              .option("parquet.encryption.footer.key", "footerKey")
-              .parquet(parquetDir)
-
-            val parquetDF = spark.read.parquet(parquetDir)
-            assert(parquetDF.inputFiles.nonEmpty)
-            val readDataset = parquetDF.select("a", "b", "c")
-
-            if (CometConf.COMET_ENABLED.get(conf)) {
-              checkSparkAnswerAndOperator(readDataset)
-            } else {
-              checkAnswer(readDataset, inputDF)
-            }
-          }
+    withTempDir { dir =>
+      withSQLConf(
+        DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
+        KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        KeyToolkit.KEY_MATERIAL_INTERNAL_PROPERTY_NAME -> "false", // default 
is true
+        InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+        val inputDF = spark
+          .range(0, 2000)
+          .map(i => (i, i.toString, i.toFloat))
+          .repartition(10)
+          .toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME, 
"key1: a, b; key2: c")
+          .option(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME, 
"footerKey")
+          .parquet(parquetDir)
+
+        verifyParquetEncrypted(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+
+        if (CometConf.COMET_ENABLED.get(conf)) {
+          checkSparkAnswerAndOperator(readDataset)
+        } else {
+          checkAnswer(readDataset, inputDF)
+        }
+      }
+    }
+  }
+
+  test("SPARK-42114: Test of uniform parquet encryption") {
+
+    import testImplicits._
+
+    withTempDir { dir =>
+      withSQLConf(
+        DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
+        KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+          s"key1: ${key1}") {
+
+        val inputDF = spark
+          .range(0, 2000)
+          .map(i => (i, i.toString, i.toFloat))
+          .repartition(10)
+          .toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option("parquet.encryption.uniform.key", "key1")
+          .parquet(parquetDir)
+
+        verifyParquetEncrypted(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+
+        if (CometConf.COMET_ENABLED.get(conf)) {
+          checkSparkAnswerAndOperator(readDataset)
+        } else {
+          checkAnswer(readDataset, inputDF)
+        }
+      }
+    }
+  }
+
+  test("Plain text footer mode") {
+    import testImplicits._
+
+    withTempDir { dir =>
+      withSQLConf(
+        DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
+        KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        PropertiesDrivenCryptoFactory.PLAINTEXT_FOOTER_PROPERTY_NAME -> 
"true", // default is false
+        InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+        val inputDF = spark
+          .range(0, 1000)
+          .map(i => (i, i.toString, i.toFloat))
+          .repartition(5)
+          .toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME, 
"key1: a, b; key2: c")
+          .option(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME, 
"footerKey")
+          .parquet(parquetDir)
+
+        verifyParquetPlaintextFooter(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+
+        if (CometConf.COMET_ENABLED.get(conf)) {
+          checkSparkAnswerAndOperator(readDataset)
+        } else {
+          checkAnswer(readDataset, inputDF)
+        }
+      }
+    }
+  }
+
+  test("Change encryption algorithm") {
+    import testImplicits._
+
+    withTempDir { dir =>
+      withSQLConf(
+        DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME -> 
cryptoFactoryClass,
+        KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        // default is AES_GCM_V1
+        PropertiesDrivenCryptoFactory.ENCRYPTION_ALGORITHM_PROPERTY_NAME -> 
"AES_GCM_CTR_V1",
+        InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+        val inputDF = spark
+          .range(0, 1000)
+          .map(i => (i, i.toString, i.toFloat))
+          .repartition(5)
+          .toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option(PropertiesDrivenCryptoFactory.COLUMN_KEYS_PROPERTY_NAME, 
"key1: a, b; key2: c")
+          .option(PropertiesDrivenCryptoFactory.FOOTER_KEY_PROPERTY_NAME, 
"footerKey")
+          .parquet(parquetDir)
+
+        verifyParquetEncrypted(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+
+        // native_datafusion and native_iceberg_compat fall back due to 
Arrow-rs
+        // 
https://github.com/apache/arrow-rs/blob/main/parquet/src/file/metadata/parser.rs#L414

Review Comment:
   Yeah I shouldnt link against main since that can change
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to