How could I access the first element of the holiday column?

I tried the following code, but it doesn't work:
start_date_test2.withColumn("diff", datediff(start_date_test2.start_date,
start_date_test2.holiday*[0]*)).show()

On Tue, Apr 25, 2017 at 10:20 PM, Zeming Yu <zemin...@gmail.com> wrote:

> Got it working now!
>
> Does anyone have a pyspark example of how to calculate the numbers of days
> from the nearest holiday based on an array column?
>
> I.e. from this table
>
> +----------+-----------------------+
> |start_date|holiday                |
> +----------+-----------------------+
> |2017-08-11|[2017-05-30,2017-10-01]|
>
>
> calculate a column called "days_from_nearest_holiday" which calculates the 
> difference between 11 aug 2017 and 1 oct 2017?
>
>
>
>
>
> On Tue, Apr 25, 2017 at 6:00 PM, Wen Pei Yu <yuw...@cn.ibm.com> wrote:
>
>> TypeError: unorderable types: str() >= datetime.date()
>>
>> Should transfer string to Date type when compare.
>>
>> Yu Wenpei.
>>
>>
>> ----- Original message -----
>> From: Zeming Yu <zemin...@gmail.com>
>> To: user <user@spark.apache.org>
>> Cc:
>> Subject: how to find the nearest holiday
>> Date: Tue, Apr 25, 2017 3:39 PM
>>
>> I have a column of dates (date type), just trying to find the nearest
>> holiday of the date. Anyone has any idea what went wrong below?
>>
>>
>>
>> start_date_test = flight3.select("start_date").distinct()
>> start_date_test.show()
>>
>> holidays = ['2017-09-01', '2017-10-01']
>>
>> +----------+
>> |start_date|
>> +----------+
>> |2017-08-11|
>> |2017-09-11|
>> |2017-09-28|
>> |2017-06-29|
>> |2017-09-29|
>> |2017-07-31|
>> |2017-08-14|
>> |2017-08-18|
>> |2017-04-09|
>> |2017-09-21|
>> |2017-08-10|
>> |2017-06-30|
>> |2017-08-19|
>> |2017-07-06|
>> |2017-06-28|
>> |2017-09-14|
>> |2017-08-08|
>> |2017-08-22|
>> |2017-07-03|
>> |2017-07-30|
>> +----------+
>> only showing top 20 rows
>>
>>
>>
>> index = spark.sparkContext.broadcast(sorted(holidays))
>>
>> def nearest_holiday(date):
>>     last_holiday = index.value[0]
>>     for next_holiday in index.value:
>>         if next_holiday >= date:
>>             break
>>         last_holiday = next_holiday
>>     if last_holiday > date:
>>         last_holiday = None
>>     if next_holiday < date:
>>         next_holiday = None
>>     return (last_holiday, next_holiday)
>>
>>
>> from pyspark.sql.types import *
>> return_type = StructType([StructField('last_holiday', StringType()),
>> StructField('next_holiday', StringType())])
>>
>> from pyspark.sql.functions import udf
>> nearest_holiday_udf = udf(nearest_holiday, return_type)
>>
>> start_date_test.withColumn('holiday', 
>> nearest_holiday_udf('start_date')).show(5,
>> False)
>>
>>
>> here's the error I got:
>>
>> ------------------------------------------------------------
>> ---------------
>> Py4JJavaError                             Traceback (most recent call
>> last)
>> <ipython-input-40-33fd4d7e8c8a> in <module>()
>>      24 nearest_holiday_udf = udf(nearest_holiday, return_type)
>>      25
>> ---> 26 start_date_test.withColumn('holiday', nearest_holiday_udf(
>> 'start_date')).show(5, False)
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\pyspark\sql\dataframe.py in show(self, n, truncate)
>>     318             print(self._jdf.showString(n, 20))
>>     319         else:
>> --> 320             print(self._jdf.showString(n, int(truncate)))
>>     321
>>     322     def __repr__(self):
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args)
>>    1131         answer = self.gateway_client.send_command(command)
>>    1132         return_value = get_return_value(
>> -> 1133             answer, self.gateway_client, self.target_id,
>> self.name)
>>    1134
>>    1135         for temp_arg in temp_args:
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\pyspark\sql\utils.py in deco(*a, **kw)
>>      61     def deco(*a, **kw):
>>      62         try:
>> ---> 63             return f(*a, **kw)
>>      64         except py4j.protocol.Py4JJavaError as e:
>>      65             s = e.java_exception.toString()
>>
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pytho
>> n\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer,
>> gateway_client, target_id, name)
>>     317                 raise Py4JJavaError(
>>     318                     "An error occurred while calling
>> {0}{1}{2}.\n".
>> --> 319                     format(target_id, ".", name), value)
>>     320             else:
>>     321                 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling o566.showString.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 98.0 failed 1 times, most recent failure: Lost task 0.0 in
>> stage 98.0 (TID 521, localhost, executor driver):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 174, in main
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 169, in process
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\serializers.py", line 220, in dump_stream
>>     self.serializer.dump_stream(self._batched(iterator), stream)
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\serializers.py", line 138, in dump_stream
>>     for obj in iterator:
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\serializers.py", line 209, in _batched
>>     for item in iterator:
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 92, in <lambda>
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 68, in <lambda>
>>   File "<ipython-input-40-33fd4d7e8c8a>", line 10, in nearest_holiday
>> TypeError: unorderable types: str() >= datetime.date()
>>
>> at org.apache.spark.api.python.PythonRunner$$anon$1.read(Python
>> RDD.scala:193)
>> at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(Pyth
>> onRDD.scala:234)
>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
>> at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$
>> anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
>> at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$
>> anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:796)
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:796)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> at java.lang.Thread.run(Unknown Source)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$sch
>> eduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$
>> 1.apply(DAGScheduler.scala:1423)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$
>> 1.apply(DAGScheduler.scala:1422)
>> at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedu
>> ler.scala:1422)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS
>> etFailed$1.apply(DAGScheduler.scala:802)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskS
>> etFailed$1.apply(DAGScheduler.scala:802)
>> at scala.Option.foreach(Option.scala:257)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:802)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOn
>> Receive(DAGScheduler.scala:1650)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRe
>> ceive(DAGScheduler.scala:1605)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRe
>> ceive(DAGScheduler.scala:1594)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>> at org.apache.spark.sql.execution.SparkPlan.executeTake(
>> SparkPlan.scala:333)
>> at org.apache.spark.sql.execution.CollectLimitExec.executeColle
>> ct(limit.scala:38)
>> at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$D
>> ataset$$execute$1$1.apply(Dataset.scala:2371)
>> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutio
>> nId(SQLExecution.scala:57)
>> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
>> at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$e
>> xecute$1(Dataset.scala:2370)
>> at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$c
>> ollect(Dataset.scala:2377)
>> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
>> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
>> at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
>> at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
>> at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
>> at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
>> at sun.reflect.GeneratedMethodAccessor89.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> at java.lang.reflect.Method.invoke(Unknown Source)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at py4j.Gateway.invoke(Gateway.java:280)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:214)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.spark.api.python.PythonException: Traceback (most
>> recent call last):
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 174, in main
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 169, in process
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\serializers.py", line 220, in dump_stream
>>     self.serializer.dump_stream(self._batched(iterator), stream)
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\serializers.py", line 138, in dump_stream
>>     for obj in iterator:
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\serializers.py", line 209, in _batched
>>     for item in iterator:
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 92, in <lambda>
>>   File "C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\pyth
>> on\lib\pyspark.zip\pyspark\worker.py", line 68, in <lambda>
>>   File "<ipython-input-40-33fd4d7e8c8a>", line 10, in nearest_holiday
>> TypeError: unorderable types: str() >= datetime.date()
>>
>> at org.apache.spark.api.python.PythonRunner$$anon$1.read(Python
>> RDD.scala:193)
>> at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(Pyth
>> onRDD.scala:234)
>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
>> at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$
>> anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
>> at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$
>> anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:796)
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$
>> apply$23.apply(RDD.scala:796)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> ... 1 more
>>
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to