I need to convert time stamps into a format I can use with matplotlib
plot_date(). epoch2num() works fine if I use it in my driver how ever I get
a numpy constructor error if use it in a UDF

Any idea what the problem is?

Thanks

Andy

P.s I am using python3 and spark-1.6

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, DoubleType, DecimalType


import pandas as pd
import numpy as np

from matplotlib.dates import epoch2num

gdf1 = cdf1.selectExpr("count", "row_key", "created",
"unix_timestamp(created) as ms")
gdf1.printSchema()
gdf1.show(10, truncate=False)
root
 |-- count: long (nullable = true)
 |-- row_key: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- ms: long (nullable = true)

+-----+---------------+---------------------+----------+
|count|row_key        |created              |ms        |
+-----+---------------+---------------------+----------+
|1    |HillaryClinton |2016-03-09 11:44:15.0|1457552655|
|2    |HillaryClinton |2016-03-09 11:44:30.0|1457552670|
|1    |HillaryClinton |2016-03-09 11:44:45.0|1457552685|
|2    |realDonaldTrump|2016-03-09 11:44:15.0|1457552655|
|1    |realDonaldTrump|2016-03-09 11:44:30.0|1457552670|
|1    |realDonaldTrump|2016-03-09 11:44:45.0|1457552685|
|3    |realDonaldTrump|2016-03-09 11:45:00.0|1457552700|
+-----+---------------+---------------------+----------+


def foo(e):
    return epoch2num(e)

epoch2numUDF = udf(foo, FloatType())
#epoch2numUDF = udf(lambda e: epoch2num(e), FloatType())
#epoch2numUDF = udf(lambda e: e + 5000000.5, FloatType())

gdf2 = gdf1.withColumn("date", epoch2numUDF(gdf1.ms))
gdf2.printSchema()
gdf2.show(truncate=False)


Py4JJavaError: An error occurred while calling o925.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage
32.0 (TID 91, localhost): net.razorvine.pickle.PickleException: expected
zero arguments for construction of ClassDict (for numpy.dtype)
        at 
net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstru
ctor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)

Works fine if I use PANDAS

pdf = gdf1.toPandas()
pdf['date'] = epoch2num(pdf['ms'] )




Reply via email to