Here is the pyspark script I used to see this difference. On Mon, 28 Aug 2017 at 09:20 Lucas Pickup <lucas.tot0.pic...@gmail.com> wrote:
> Hi all, > > Very sorry if people already responded to this at: > lucas.pic...@microsoft.com There was an INVALID identifier attached to > the end of the reply address for some reason which may have caused replies > to be lost. > > I've been messing around with Spark and PyArrow Parquet reading. In my > testing I've found that a Parquet file written by Spark containing a > datetime column, results in different datetimes from Spark and PyArrow. > > The attached script demonstrates this. > > Output: > > Spark Reading the parquet file into a DataFrame: > *[Row(Date=datetime.datetime(2015, 7, 5, 23, 50)), > Row(Date=datetime.datetime(2015, 7, 5, 23, 30))]* > > PyArrow table has dates as UTC (7 hours ahead) > > > > *<pyarrow.lib.TimestampArray object at 0x0000029F3AFE79A8>[ > Timestamp('2015-07-06 06:50:00')]* > > Pandas DF from pyarrow table has dates as UTC (7 hours ahead) > > > > * Date0 2015-07-06 06:50:001 2015-07-06 06:30:00* > > I would've expected to end up with the same datetime from both readers > since there was no timezone attached at any point. It just a date and time > value. > Am I missing anything here? Or is this a bug. > > I attempted to intercept the timestamp values before pyarrow turns them > into python objects so I could add timezone information which may fix this > issue: > > The goal is to qualify the TimestampValue with a timezone (by creating a > new column in the arrow table based off the previous one). If this can be > done before the Value's are converted to python it may fix the issue I was > having. But it doesn't appear that I can create a new Timestamp type column > with the values from the old timestamp column. > > Here is the code I'm using: > > def chunkedToArray(data): > for chunk in data.iterchunks(): > for value in chunk: > yield value > > def datetimeColumnsAddTimezone(table): > for i, field in enumerate(table.schema): > if field.type == pa.timestamp('ns'): > newField = pa.field(field.name, pa.timestamp('ns', tz='GMT'), > field.nullable, field.metadata) > newArray = pa.array([val for val in > chunkedToArray(table[i].data)], pa.timestamp('ns', tz='GMT')) > newColumn = pa.Column.from_array(newField, newArray) > table = table.remove_column(i) > table = table.add_column(i, newColumn) > return table > > Cheers, Lucas Pickup >
import pyspark from pyspark.sql import SQLContext, Row from pyspark.sql.types import TimestampType, StructType, StructField import pandas import pyarrow.parquet as pq from datetime import datetime parquetOutputPath = "E:\\dev\\TestFiles\\date10-pyspark.parquet" sqlC = SQLContext(pyspark.SparkContext.getOrCreate()) sparkRows = [Row(Date=datetime.strptime('2015-07-05 23:50:00', '%Y-%m-%d %H:%M:%S')), Row(Date=datetime.strptime('2015-07-05 23:30:00', '%Y-%m-%d %H:%M:%S'))] sparkDFSchema = StructType([StructField("Date", TimestampType(), True)]) sparkDF = sqlC.createDataFrame(sparkRows, sparkDFSchema) print("Spark DF has correct datetimes") print(sparkDF.take(10)) sparkDF.write.parquet(parquetOutputPath) sparkDFFromParquet = sqlC.read.parquet(parquetOutputPath) print("Spark DF from written Parquet has correct datetimes") print(sparkDFFromParquet.take(10)) pyarrowTable = pq.read_table(parquetOutputPath) print("PyArrow table has dates as UTC (7 hours ahead)") print(pyarrowTable.column(0).data.chunk(0)) pyarrowDF = pyarrowTable.to_pandas() print("Pandas DF from pyarrow table has dates as UTC (7 hours ahead)") print(pyarrowDF)