Here is the pyspark script I used to see this difference.

On Mon, 28 Aug 2017 at 09:20 Lucas Pickup <lucas.tot0.pic...@gmail.com>
wrote:

> Hi all,
>
> Very sorry if people already responded to this at:
> lucas.pic...@microsoft.com There was an INVALID identifier attached to
> the end of the reply address for some reason which may have caused replies
> to be lost.
>
> I've been messing around with Spark and PyArrow Parquet reading. In my
> testing I've found that a Parquet file written by Spark containing a
> datetime column, results in different datetimes from Spark and PyArrow.
>
> The attached script demonstrates this.
>
> Output:
>
> Spark Reading the parquet file into a DataFrame:
> *[Row(Date=datetime.datetime(2015, 7, 5, 23, 50)),
> Row(Date=datetime.datetime(2015, 7, 5, 23, 30))]*
>
> PyArrow table has dates as UTC (7 hours ahead)
>
>
>
> *<pyarrow.lib.TimestampArray object at 0x0000029F3AFE79A8>[
> Timestamp('2015-07-06 06:50:00')]*
>
> Pandas DF from pyarrow table has dates as UTC (7 hours ahead)
>
>
>
> *                 Date0 2015-07-06 06:50:001 2015-07-06 06:30:00*
>
> I would've expected to end up with the same datetime from both readers
> since there was no timezone attached at any point. It just a date and time
> value.
> Am I missing anything here? Or is this a bug.
>
> I attempted to intercept the timestamp values before pyarrow turns them
> into python objects so I could add timezone information which may fix this
> issue:
>
> The goal is to qualify the TimestampValue with a timezone (by creating a
> new column in the arrow table based off the previous one). If this can be
> done before the Value's are converted to python it may fix the issue I was
> having. But it doesn't appear that I can create a new Timestamp type column
> with the values from the old timestamp column.
>
> Here is the code I'm using:
>
> def chunkedToArray(data):
>     for chunk in data.iterchunks():
>         for value in chunk:
>             yield value
>
>  def datetimeColumnsAddTimezone(table):
>     for i, field in enumerate(table.schema):
>         if field.type == pa.timestamp('ns'):
>             newField = pa.field(field.name, pa.timestamp('ns', tz='GMT'),
> field.nullable, field.metadata)
>             newArray = pa.array([val for val in
> chunkedToArray(table[i].data)], pa.timestamp('ns', tz='GMT'))
>             newColumn = pa.Column.from_array(newField, newArray)
>             table = table.remove_column(i)
>             table = table.add_column(i, newColumn)
>    return table
>
> Cheers, Lucas Pickup
>
import pyspark
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import TimestampType, StructType, StructField
import pandas
import pyarrow.parquet as pq
from datetime import datetime

parquetOutputPath = "E:\\dev\\TestFiles\\date10-pyspark.parquet"

sqlC = SQLContext(pyspark.SparkContext.getOrCreate())

sparkRows = [Row(Date=datetime.strptime('2015-07-05 23:50:00', '%Y-%m-%d 
%H:%M:%S')), Row(Date=datetime.strptime('2015-07-05 23:30:00', '%Y-%m-%d 
%H:%M:%S'))]
sparkDFSchema = StructType([StructField("Date", TimestampType(), True)])

sparkDF = sqlC.createDataFrame(sparkRows, sparkDFSchema)
print("Spark DF has correct datetimes")
print(sparkDF.take(10))

sparkDF.write.parquet(parquetOutputPath)

sparkDFFromParquet = sqlC.read.parquet(parquetOutputPath)
print("Spark DF from written Parquet has correct datetimes")
print(sparkDFFromParquet.take(10))

pyarrowTable = pq.read_table(parquetOutputPath)
print("PyArrow table has dates as UTC (7 hours ahead)")
print(pyarrowTable.column(0).data.chunk(0))

pyarrowDF = pyarrowTable.to_pandas()
print("Pandas DF from pyarrow table has dates as UTC (7 hours ahead)")
print(pyarrowDF)

Reply via email to