Hi Andrew,
sorry but to me it seems s3 is the culprit....
I have downloaded your json file and stored locally. Then write this simple
app (a subset of what you have in ur github, sorry i m littebit rusty on
how to create new column out of existing ones) which basically read the
json file
It's in Scala, but the Python equivaletnt shouldn't be difficult
i noticed that in your schema you forgot the timezone column.... was that
intentional?
Anyway, below the code. i ran it with Spark 2.0 and similarly 1.6...
found no issues in reading the data. If i have some time i'll try to store
your json on one of my s3 bucket and read it via spark from EC2
def main(args:Array[String]) = {
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
val conf = new SparkConf().setAppName("Simple
Application").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// no schema
val jsonContentNoSchema =
sqlContext.read.json("file:///c:/tmp/1973-01-11.json")
jsonContentNoSchema.printSchema()
println(s"TheJsonContent with No SChema has
${jsonContentNoSchema.count()}")
// with schema
val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json")
import sqlContext.implicits._
val schema = (new StructType).add("hour", StringType).add("month",
StringType)
.add("second", StringType).add("year", StringType)
.add("timezone", StringType).add("day", StringType)
.add("minute", StringType)
val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema)
println(s"----- And the Json withSchema has
${jsonContentWithSchema.count()} rows")
}
hope this helps
kr
marco
On Mon, Nov 28, 2016 at 2:48 PM, Andrew Holway <
[email protected]> wrote:
> I extracted out the boto bits and tested in vanilla python on the nodes. I
> am pretty sure that the data from S3 is ok. I've applied a public policy to
> the bucket s3://time-waits-for-no-man. There is a publicly available object
> here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-man/1973-01-11
>
> I'm using boto because using proxies with spark and hadoop in general is a
> bit of a black art.
>
>
> [centos@hadoop002 ~]$ python s3_test.py
> object key
> 1973-01-11
> Length of List
> 86400
> First row
> {u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': u'00',
> u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'00'}
> Last row
> {u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': u'59',
> u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'59'}
> [centos@hadoop002 ~]$ cat s3_test.py
> import boto3
> import ujson
> import arrow
> import sys
> import os
> import getpass
>
> os.environ['HTTPS_PROXY'] = 'https://webproxy:8080
> <https://webproxy.aws.db.de:8080>'
>
> def add_timestamp(dict):
> dict['timestamp'] = arrow.get(
> int(dict['year']),
> int(dict['month']),
> int(dict['day']),
> int(dict['hour']),
> int(dict['minute']),
> int(dict['second'])
> ).timestamp
> return dict
>
> s3_list = []
> s3 = boto3.resource('s3')
> my_bucket = s3.Bucket('time-waits-for-no-man')
> for object in my_bucket.objects.filter(Prefix='1973-01-11'):
> s3_list.append(object.key)
>
> print("object key")
> print (s3_list[0])
>
> s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man',
> key=s3_list[0])
> contents = s3obj.get()['Body'].read().decode()
> meow = contents.splitlines()
> result_wo_timestamp = map(ujson.loads, meow)
> result_wo_timestamp[0]
> result_wi_timestamp = map(add_timestamp, result_wo_timestamp)
>
> print("Length of List")
> print(len(result_wi_timestamp))
> print("First row")
> print(result_wi_timestamp[0])
> print("Last row")
> print(result_wi_timestamp[86399])
>
>
>
>
> On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <[email protected]>
> wrote:
>
>> Hi
>>
>> pickle erros normally point to serialisation issue. i am suspecting
>> something wrong with ur S3 data , but is just a wild guess...
>>
>> Is your s3 object publicly available?
>>
>> few suggestions to nail down the problem
>>
>> 1 - try to see if you can read your object from s3 using boto3 library
>> 'offline', meaning not in a spark code
>>
>> 2 - try to replace your distributedJsonRead. instead of reading from s3,
>> generate a string out of a snippet of your json object
>>
>> 3 - Spark can read data from s3 as well , just do a
>> sc.textFile('s3://....) ==> http://www.sparktutorials.
>> net/reading-and-writing-s3-data-with-apache-spark. Try to se spark
>> entirely to read and process the data, rather than go via boto3. It adds an
>> extra complexity which you dont need
>>
>> If you send a snippet ofyour json content, then everyone on the list can
>> run the code and try to reproduce
>>
>>
>> hth
>>
>> Marco
>>
>>
>> On 27 Nov 2016 7:33 pm, "Andrew Holway" <[email protected]>
>> wrote:
>>
>>> I get a slight different error when not specifying a schema:
>>>
>>> Traceback (most recent call last):
>>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>> line 61, in <module>
>>> df = sqlContext.createDataFrame(foo)
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>> line 299, in createDataFrame
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 520, in createDataFrame
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 360, in _createFromRDD
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 331, in _inferSchema
>>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 1328, in first
>>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 1310, in take
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py",
>>> line 941, in runJob
>>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2403, in _jrdd
>>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2336, in _wrap_function
>>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2315, in _prepare_for_python_RDD
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 428, in dumps
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 657, in dumps
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 107, in dump
>>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>> self.save(obj)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>> save(element)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>> save(element)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>> self._batch_appends(iter(obj))
>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>> save(x)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>> save(element)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>> self._batch_appends(iter(obj))
>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>> save(x)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>> save(element)
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>> self._batch_appends(iter(obj))
>>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>> save(tmp[0])
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 198, in save_function
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 246, in save_function_tuple
>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>> f(self, obj) # Call unbound method with explicit self
>>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>> self._batch_setitems(obj.iteritems())
>>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>> save(v)
>>> File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>> rv = reduce(self.proto)
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>> line 933, in __call__
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 63, in deco
>>> File
>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>> line 316, in get_return_value
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> o33.__getnewargs__. Trace:
>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
>>> at py4j.Gateway.invoke(Gateway.java:272)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can anyone tell me what is causing this error
>>>> Spark 2.0.0
>>>> Python 2.7.5
>>>>
>>>> df = sqlContext.createDataFrame(foo, schema)
>>>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a
>>>>
>>>> Traceback (most recent call last):
>>>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>>> line 61, in <module>
>>>> df = sqlContext.createDataFrame(foo, schema)
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>>> line 299, in createDataFrame
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>>> line 523, in createDataFrame
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2220, in _to_java_object_rdd
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2403, in _jrdd
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2336, in _wrap_function
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2315, in _prepare_for_python_RDD
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>>> line 428, in dumps
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 657, in dumps
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 107, in dump
>>>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>>> self.save(obj)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>>> save(element)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>> save(element)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>> self._batch_appends(iter(obj))
>>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>> save(x)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>> save(element)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>> self._batch_appends(iter(obj))
>>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>> save(x)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>> save(element)
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>> self._batch_appends(iter(obj))
>>>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>>> save(tmp[0])
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 198, in save_function
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 246, in save_function_tuple
>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>> f(self, obj) # Call unbound method with explicit self
>>>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>>> self._batch_setitems(obj.iteritems())
>>>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>>> save(v)
>>>> File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>>> rv = reduce(self.proto)
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>>> line 933, in __call__
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>>> line 63, in deco
>>>> File
>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>>> line 316, in get_return_value
>>>> py4j.protocol.Py4JError: An error occurred while calling
>>>> o33.__getnewargs__. Trace:
>>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>> java:318)
>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>> java:326)
>>>> at py4j.Gateway.invoke(Gateway.java:272)
>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>
>>>> --
>>>> Otter Networks UG
>>>> http://otternetworks.de
>>>> Gotenstraße 17
>>>> 10829 Berlin
>>>>
>>>
>>>
>>>
>>> --
>>> Otter Networks UG
>>> http://otternetworks.de
>>> Gotenstraße 17
>>> 10829 Berlin
>>>
>>
>
>
> --
> Otter Networks UG
> http://otternetworks.de
> Gotenstraße 17
> 10829 Berlin
>