sryza commented on code in PR #51590:
URL: https://github.com/apache/spark/pull/51590#discussion_r2226019454


##########
python/pyspark/pipelines/block_imperative_construct.py:
##########
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from contextlib import contextmanager
+from typing import Generator, NoReturn, List, Callable
+
+from pyspark.errors import PySparkException
+from pyspark.sql.connect.catalog import Catalog
+from pyspark.sql.connect.conf import RuntimeConf
+from pyspark.sql.connect.dataframe import DataFrame
+from pyspark.sql.connect.udf import UDFRegistration
+
+# pyspark methods that should be blocked from executing in python pipeline 
definition files
+BLOCKED_METHODS: List = [
+    {
+        "class": RuntimeConf,
+        "method": "set",
+        "suggestion": "Instead set configuration via the pipeline spec "
+        "or use the 'spark_conf' argument in various decorators",
+    },
+    {
+        "class": Catalog,
+        "method": "setCurrentCatalog",
+        "suggestion": "Instead set catalog via the pipeline spec "
+        "or the 'name' argument on the dataset decorators",
+    },
+    {
+        "class": Catalog,
+        "method": "setCurrentDatabase",
+        "suggestion": "Instead set database via the pipeline spec "
+        "or the 'name' argument on the dataset decorators",
+    },
+    {
+        "class": Catalog,
+        "method": "dropTempView",
+        "suggestion": "Instead remove the temporary view definition directly",
+    },
+    {
+        "class": Catalog,
+        "method": "dropGlobalTempView",
+        "suggestion": "Instead remove the temporary view definition directly",
+    },
+    {
+        "class": DataFrame,
+        "method": "createTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": DataFrame,
+        "method": "createOrReplaceTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": DataFrame,
+        "method": "createGlobalTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": DataFrame,
+        "method": "createOrReplaceGlobalTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": UDFRegistration,
+        "method": "register",
+        "suggestion": "",
+    },
+    {
+        "class": UDFRegistration,
+        "method": "registerJavaFunction",
+        "suggestion": "",
+    },
+    {
+        "class": UDFRegistration,
+        "method": "registerJavaUDAF",
+        "suggestion": "",
+    },
+]
+
+
+def _create_blocked_method(error_method_name: str, suggestion: str) -> 
Callable:
+    def blocked_method(*args: object, **kwargs: object) -> NoReturn:
+        raise PySparkException(
+            errorClass="IMPERATIVE_CONSTRUCT_IN_DECLARATIVE_PIPELINE",
+            messageParameters={
+                "method": error_method_name,
+                "suggestion": suggestion,
+            },
+        )
+
+    return blocked_method
+
+
+@contextmanager
+def block_imperative_construct() -> Generator[None, None, None]:

Review Comment:
   ```suggestion
   def block_imperative_constructs() -> Generator[None, None, None]:
   ```



##########
python/pyspark/errors/error-conditions.json:
##########
@@ -363,6 +363,12 @@
       "Function `<func_name>` should return Column, got <return_type>."
     ]
   },
+  "IMPERATIVE_CONSTRUCT_IN_DECLARATIVE_PIPELINE": {
+    "message": [
+      "Imperative construct <method> is not allowed in declarative pipelines.",

Review Comment:
   People can have different ideas of what "imperative" means, so I wonder if 
we should be more specific here. E.g. "Changing Spark session state is not 
allowed in declarative pipelines" or "Imperative constructs like changing Spark 
session state are not allowed in declarative pipelines".



##########
python/pyspark/pipelines/block_imperative_construct.py:
##########
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from contextlib import contextmanager
+from typing import Generator, NoReturn, List, Callable
+
+from pyspark.errors import PySparkException
+from pyspark.sql.connect.catalog import Catalog
+from pyspark.sql.connect.conf import RuntimeConf
+from pyspark.sql.connect.dataframe import DataFrame
+from pyspark.sql.connect.udf import UDFRegistration
+
+# pyspark methods that should be blocked from executing in python pipeline 
definition files
+BLOCKED_METHODS: List = [
+    {
+        "class": RuntimeConf,
+        "method": "set",
+        "suggestion": "Instead set configuration via the pipeline spec "
+        "or use the 'spark_conf' argument in various decorators",
+    },
+    {
+        "class": Catalog,
+        "method": "setCurrentCatalog",
+        "suggestion": "Instead set catalog via the pipeline spec "
+        "or the 'name' argument on the dataset decorators",
+    },
+    {
+        "class": Catalog,
+        "method": "setCurrentDatabase",
+        "suggestion": "Instead set database via the pipeline spec "
+        "or the 'name' argument on the dataset decorators",
+    },
+    {
+        "class": Catalog,
+        "method": "dropTempView",
+        "suggestion": "Instead remove the temporary view definition directly",
+    },
+    {
+        "class": Catalog,
+        "method": "dropGlobalTempView",
+        "suggestion": "Instead remove the temporary view definition directly",
+    },
+    {
+        "class": DataFrame,
+        "method": "createTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": DataFrame,
+        "method": "createOrReplaceTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": DataFrame,
+        "method": "createGlobalTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": DataFrame,
+        "method": "createOrReplaceGlobalTempView",
+        "suggestion": "Instead use the @temporary_view decorator to define 
temporary views",
+    },
+    {
+        "class": UDFRegistration,
+        "method": "register",
+        "suggestion": "",
+    },
+    {
+        "class": UDFRegistration,
+        "method": "registerJavaFunction",
+        "suggestion": "",
+    },
+    {
+        "class": UDFRegistration,
+        "method": "registerJavaUDAF",
+        "suggestion": "",
+    },
+]
+
+
+def _create_blocked_method(error_method_name: str, suggestion: str) -> 
Callable:
+    def blocked_method(*args: object, **kwargs: object) -> NoReturn:
+        raise PySparkException(
+            errorClass="IMPERATIVE_CONSTRUCT_IN_DECLARATIVE_PIPELINE",
+            messageParameters={
+                "method": error_method_name,
+                "suggestion": suggestion,
+            },
+        )
+
+    return blocked_method
+
+
+@contextmanager
+def block_imperative_construct() -> Generator[None, None, None]:
+    """
+    Context manager that blocks imperative constructs found in a pipeline 
python definition file
+    Blocks:
+        - imperative config set via: spark.conf.set("k", "v")

Review Comment:
   It will be easy for this list to get out of sync with BLOCKED_METHODS if we 
add more. I think better to mention BLOCKED_METHODS here than to try to 
enumerate all its contents here.



##########
python/pyspark/pipelines/block_imperative_construct.py:
##########
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from contextlib import contextmanager
+from typing import Generator, NoReturn, List, Callable
+
+from pyspark.errors import PySparkException
+from pyspark.sql.connect.catalog import Catalog
+from pyspark.sql.connect.conf import RuntimeConf
+from pyspark.sql.connect.dataframe import DataFrame
+from pyspark.sql.connect.udf import UDFRegistration
+
+# pyspark methods that should be blocked from executing in python pipeline 
definition files
+BLOCKED_METHODS: List = [
+    {
+        "class": RuntimeConf,
+        "method": "set",
+        "suggestion": "Instead set configuration via the pipeline spec "

Review Comment:
   It would be better to have this text inside of the error-conditions.json – 
that way it's in a central place that can be internationalized more easily. 
Thoughts on having a sub-error code for each of these? E.g. 
`SET_CURRENT_CATALOG`?



##########
python/pyspark/errors/error-conditions.json:
##########
@@ -363,6 +363,12 @@
       "Function `<func_name>` should return Column, got <return_type>."
     ]
   },
+  "IMPERATIVE_CONSTRUCT_IN_DECLARATIVE_PIPELINE": {

Review Comment:
   ```suggestion
     "SESSION_MUTATION_IN_DECLARATIVE_PIPELINE": {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to