Thank you Jeff! I would certainly give it a try. 

Best,
Rahul

On 2021/09/26 22:49:03, Jeff Zhang <zjf...@gmail.com> wrote: 
> Hi kumar,
> 
> You can try Zeppelin which support the udf sharing across languages
> 
> http://zeppelin.apache.org/
> 
> 
> 
> 
> rahul kumar <rk20.stor...@gmail.com> 于2021年9月27日周一 上午4:20写道:
> 
> > I'm trying to use a function defined in scala jar in pyspark ( spark
> > 3.0.2).
> >
> > --scala ---
> >
> > Object PythonUtil {
> >
> > def customedf(dataFrame: DataFrame,
> >                  keyCol: String,
> >                  table: String,
> >                  outputSchema: StructType,
> >                  database: String): DataFrame = {
> >
> > // some transformation of dataframe and convert as per the output schema
> > types and fields.
> > ...
> > resultDF
> > }
> >
> > //In jupyter notebook
> > schema creation:
> > alias = StructType([StructField("first_name", StringType(),
> > False),StructField("last_name", StringType(), False)])
> > name = StructType([StructField("first_name", StringType(),
> > False),StructField("aliases", ArrayType(alias), False)])
> > street_adress = StructType([StructField("street_name", StringType(),
> > False),StructField("apt_number", IntegerType(), False)])
> > address = StructType([StructField("zip", LongType(),
> > False),StructField("street", street_adress, False),StructField("city",
> > StringType(), False)])
> > workHistory = StructType([StructField("company_name", StringType(),
> > False),StructField("company_address", address,
> > False),StructField("worked_from", StringType(), False)])
> >
> > //passing this to scala function.
> > outputschema= StructType([StructField("name", name,
> > False),StructField("SSN", StringType(), False),StructField("home_address",
> > ArrayType(address), False)])
> >
> > ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"],
> > ["525-31-0299"], ["456-45-2200"], ["200-71-7765"]]
> > customerIdsDF=spark.createDataFrame(ssns,["SSN"])
> >
> > scala2_object= sc._jvm.com.mytest.spark.PythonUtil
> > pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN',
> > 'table', outputschema, 'test'), spark._wrapped)
> >
> > Then I get an error that AttributeError: 'StructField' object has no
> > attribute '_get_object_id'
> >
> > full stacktrace
> > ---------------------------------------------------------------------------
> > AttributeError                            Traceback (most recent call last)
> > <ipython-input-25-74a3b3e652e6> in <module>
> >       4
> >       5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
> > ----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf,
> > 'SSN', 'table',smallSchema, 'test'), spark._wrapped)
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in __call__(self, *args)
> >    1294
> >    1295     def __call__(self, *args):
> > -> 1296         args_command, temp_args = self._build_args(*args)
> >    1297
> >    1298         command = proto.CALL_COMMAND_NAME +\
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in _build_args(self, *args)
> >    1258     def _build_args(self, *args):
> >    1259         if self.converters is not None and len(self.converters) >
> > 0:
> > -> 1260             (new_args, temp_args) = self._get_args(args)
> >    1261         else:
> >    1262             new_args = args
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in _get_args(self, args)
> >    1245                 for converter in self.gateway_client.converters:
> >    1246                     if converter.can_convert(arg):
> > -> 1247                         temp_arg = converter.convert(arg,
> > self.gateway_client)
> >    1248                         temp_args.append(temp_arg)
> >    1249                         new_args.append(temp_arg)
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
> > in convert(self, object, gateway_client)
> >     509         java_list = ArrayList()
> >     510         for element in object:
> > --> 511             java_list.add(element)
> >     512         return java_list
> >     513
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in __call__(self, *args)
> >    1294
> >    1295     def __call__(self, *args):
> > -> 1296         args_command, temp_args = self._build_args(*args)
> >    1297
> >    1298         command = proto.CALL_COMMAND_NAME +\
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in _build_args(self, *args)
> >    1264
> >    1265         args_command = "".join(
> > -> 1266             [get_command_part(arg, self.pool) for arg in new_args])
> >    1267
> >    1268         return args_command, temp_args
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
> > in <listcomp>(.0)
> >    1264
> >    1265         args_command = "".join(
> > -> 1266             [get_command_part(arg, self.pool) for arg in new_args])
> >    1267
> >    1268         return args_command, temp_args
> >
> > ~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
> > in get_command_part(parameter, python_proxy_pool)
> >     296             command_part += ";" + interface
> >     297     else:
> > --> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
> >     299
> >     300     command_part += "\n"
> >
> >
> >
> >
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 
> -- 
> Best Regards
> 
> Jeff Zhang
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to