Thanks Sean, - I have tried executing it without wrapping into DataFrame constructor, but got the same error " AttributeError: 'StructField' object has no attribute '_get_object_id'
- I have also tried using udf scala udf: class PythonUtil() extends UDF5[DataFrame, String,String, StructType, String, DataFrame] with Serializable { def call(t1: DataFrame, keyCol: String, set: String, outputSchema: StructType, namespace: String): DataFrame = { HelperFunctions.doTransformation(t1,keyCol,set,outputSchema,namespace) } } corresponding python section spark.udf.registerJavaFunction('com.mytest.spark.PythonUtil','com.mytest.spark.PythonUtil', pyspark.sql.dataframe.DataFrame()) Here in the return type, I'm not able to provide correct syntax of DataFrame. It appears dataframe requires data and sqlctxt ( unlike simple types like StringType() etc.. ) - Since I was not pass dataframe between scala and python, I tried working on view in scala, however didn't succeed there as well. Would be able to point me to some sample where folks are passing dataframe and schema to scala jar and getting dataframe back. Thanks, Rahul On 2021/09/26 23:21:36, Sean Owen <sro...@gmail.com> wrote: > You can also call a Scala UDF from Python in Spark - this doesn't need > Zeppelin or relate to the front-end. > This may indeed be much easier as a proper UDF; depends on what this > function does. > However I think the issue may be that you're trying to wrap the resulting > DataFrame in a DataFrame or something. First inspect what you get back from > the invocation of the Scala method. > > > On Sun, Sep 26, 2021 at 5:50 PM Jeff Zhang <zjf...@gmail.com> wrote: > > > Hi kumar, > > > > You can try Zeppelin which support the udf sharing across languages > > > > http://zeppelin.apache.org/ > > > > > > > > > > rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道: > > > >> I'm trying to use a function defined in scala jar in pyspark ( spark > >> 3.0.2). > >> > >> --scala --- > >> > >> Object PythonUtil { > >> > >> def customedf(dataFrame: DataFrame, > >> keyCol: String, > >> table: String, > >> outputSchema: StructType, > >> database: String): DataFrame = { > >> > >> // some transformation of dataframe and convert as per the output schema > >> types and fields. > >> ... > >> resultDF > >> } > >> > >> //In jupyter notebook > >> schema creation: > >> alias = StructType([StructField("first_name", StringType(), > >> False),StructField("last_name", StringType(), False)]) > >> name = StructType([StructField("first_name", StringType(), > >> False),StructField("aliases", ArrayType(alias), False)]) > >> street_adress = StructType([StructField("street_name", StringType(), > >> False),StructField("apt_number", IntegerType(), False)]) > >> address = StructType([StructField("zip", LongType(), > >> False),StructField("street", street_adress, False),StructField("city", > >> StringType(), False)]) > >> workHistory = StructType([StructField("company_name", StringType(), > >> False),StructField("company_address", address, > >> False),StructField("worked_from", StringType(), False)]) > >> > >> //passing this to scala function. > >> outputschema= StructType([StructField("name", name, > >> False),StructField("SSN", StringType(), False),StructField("home_address", > >> ArrayType(address), False)]) > >> > >> ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], > >> ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]] > >> customerIdsDF=spark.createDataFrame(ssns,["SSN"]) > >> > >> scala2_object= sc._jvm.com.mytest.spark.PythonUtil > >> pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', > >> 'table', outputschema, 'test'), spark._wrapped) > >> > >> Then I get an error that AttributeError: 'StructField' object has no > >> attribute '_get_object_id' > >> > >> full stacktrace > >> > >> --------------------------------------------------------------------------- > >> AttributeError Traceback (most recent call > >> last) > >> <ipython-input-25-74a3b3e652e6> in <module> > >> 4 > >> 5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil > >> ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, > >> 'SSN', 'table',smallSchema, 'test'), spark._wrapped) > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in __call__(self, *args) > >> 1294 > >> 1295 def __call__(self, *args): > >> -> 1296 args_command, temp_args = self._build_args(*args) > >> 1297 > >> 1298 command = proto.CALL_COMMAND_NAME +\ > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in _build_args(self, *args) > >> 1258 def _build_args(self, *args): > >> 1259 if self.converters is not None and len(self.converters) > > >> 0: > >> -> 1260 (new_args, temp_args) = self._get_args(args) > >> 1261 else: > >> 1262 new_args = args > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in _get_args(self, args) > >> 1245 for converter in self.gateway_client.converters: > >> 1246 if converter.can_convert(arg): > >> -> 1247 temp_arg = converter.convert(arg, > >> self.gateway_client) > >> 1248 temp_args.append(temp_arg) > >> 1249 new_args.append(temp_arg) > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py > >> in convert(self, object, gateway_client) > >> 509 java_list = ArrayList() > >> 510 for element in object: > >> --> 511 java_list.add(element) > >> 512 return java_list > >> 513 > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in __call__(self, *args) > >> 1294 > >> 1295 def __call__(self, *args): > >> -> 1296 args_command, temp_args = self._build_args(*args) > >> 1297 > >> 1298 command = proto.CALL_COMMAND_NAME +\ > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in _build_args(self, *args) > >> 1264 > >> 1265 args_command = "".join( > >> -> 1266 [get_command_part(arg, self.pool) for arg in > >> new_args]) > >> 1267 > >> 1268 return args_command, temp_args > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > >> in <listcomp>(.0) > >> 1264 > >> 1265 args_command = "".join( > >> -> 1266 [get_command_part(arg, self.pool) for arg in > >> new_args]) > >> 1267 > >> 1268 return args_command, temp_args > >> > >> ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py > >> in get_command_part(parameter, python_proxy_pool) > >> 296 command_part += ";" + interface > >> 297 else: > >> --> 298 command_part = REFERENCE_TYPE + parameter._get_object_id() > >> 299 > >> 300 command_part += "\n" > >> > >> > >> > >> > >> > >> > >> --------------------------------------------------------------------- > >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >> > >> > > > > -- > > Best Regards > > > > Jeff Zhang > > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org