I'm trying to use a function defined in scala jar in pyspark ( spark 3.0.2).  

--scala ---

Object PythonUtil {

def customedf(dataFrame: DataFrame,
                 keyCol: String,
                 table: String,
                 outputSchema: StructType,
                 database: String): DataFrame = {

// some transformation of dataframe and convert as per the output schema types 
and fields.
...
resultDF
}

//In jupyter notebook
schema creation:
alias = StructType([StructField("first_name", StringType(), 
False),StructField("last_name", StringType(), False)])
name = StructType([StructField("first_name", StringType(), 
False),StructField("aliases", ArrayType(alias), False)])
street_adress = StructType([StructField("street_name", StringType(), 
False),StructField("apt_number", IntegerType(), False)])
address = StructType([StructField("zip", LongType(), 
False),StructField("street", street_adress, False),StructField("city", 
StringType(), False)])
workHistory = StructType([StructField("company_name", StringType(), 
False),StructField("company_address", address, 
False),StructField("worked_from", StringType(), False)])

//passing this to scala function.
outputschema= StructType([StructField("name", name, False),StructField("SSN", 
StringType(), False),StructField("home_address", ArrayType(address), False)])

ssns = [["825-55-3247"], ["289-18-1554"], ["756-46-4088"], ["525-31-0299"], 
["456-45-2200"], ["200-71-7765"]]
customerIdsDF=spark.createDataFrame(ssns,["SSN"])

scala2_object= sc._jvm.com.mytest.spark.PythonUtil
pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 
'table', outputschema, 'test'), spark._wrapped)

Then I get an error that AttributeError: 'StructField' object has no attribute 
'_get_object_id'

full stacktrace
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-25-74a3b3e652e6> in <module>
      4 
      5 scala2_object= sc._jvm.com.aerospike.spark.PythonUtil
----> 6 pyspark.sql.DataFrame(scala2_object.customdf(customerIdsDF._jdf, 'SSN', 
'table',smallSchema, 'test'), spark._wrapped)

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
   1294 
   1295     def __call__(self, *args):
-> 1296         args_command, temp_args = self._build_args(*args)
   1297 
   1298         command = proto.CALL_COMMAND_NAME +\

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in _build_args(self, *args)
   1258     def _build_args(self, *args):
   1259         if self.converters is not None and len(self.converters) > 0:
-> 1260             (new_args, temp_args) = self._get_args(args)
   1261         else:
   1262             new_args = args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in _get_args(self, args)
   1245                 for converter in self.gateway_client.converters:
   1246                     if converter.can_convert(arg):
-> 1247                         temp_arg = converter.convert(arg, 
self.gateway_client)
   1248                         temp_args.append(temp_arg)
   1249                         new_args.append(temp_arg)

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_collections.py
 in convert(self, object, gateway_client)
    509         java_list = ArrayList()
    510         for element in object:
--> 511             java_list.add(element)
    512         return java_list
    513 

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
   1294 
   1295     def __call__(self, *args):
-> 1296         args_command, temp_args = self._build_args(*args)
   1297 
   1298         command = proto.CALL_COMMAND_NAME +\

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in _build_args(self, *args)
   1264 
   1265         args_command = "".join(
-> 1266             [get_command_part(arg, self.pool) for arg in new_args])
   1267 
   1268         return args_command, temp_args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in <listcomp>(.0)
   1264 
   1265         args_command = "".join(
-> 1266             [get_command_part(arg, self.pool) for arg in new_args])
   1267 
   1268         return args_command, temp_args

~/.sdkman/candidates/spark/3.0.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py
 in get_command_part(parameter, python_proxy_pool)
    296             command_part += ";" + interface
    297     else:
--> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
    299 
    300     command_part += "\n"






---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to