I extracted out the boto bits and tested in vanilla python on the nodes. I am pretty sure that the data from S3 is ok. I've applied a public policy to the bucket s3://time-waits-for-no-man. There is a publicly available object here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-man/1973-01-11
I'm using boto because using proxies with spark and hadoop in general is a bit of a black art. [centos@hadoop002 ~]$ python s3_test.py object key 1973-01-11 Length of List 86400 First row {u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': u'00', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'00'} Last row {u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': u'59', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'59'} [centos@hadoop002 ~]$ cat s3_test.py import boto3 import ujson import arrow import sys import os import getpass os.environ['HTTPS_PROXY'] = 'https://webproxy:8080 <https://webproxy.aws.db.de:8080>' def add_timestamp(dict): dict['timestamp'] = arrow.get( int(dict['year']), int(dict['month']), int(dict['day']), int(dict['hour']), int(dict['minute']), int(dict['second']) ).timestamp return dict s3_list = [] s3 = boto3.resource('s3') my_bucket = s3.Bucket('time-waits-for-no-man') for object in my_bucket.objects.filter(Prefix='1973-01-11'): s3_list.append(object.key) print("object key") print (s3_list[0]) s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man', key=s3_list[0]) contents = s3obj.get()['Body'].read().decode() meow = contents.splitlines() result_wo_timestamp = map(ujson.loads, meow) result_wo_timestamp[0] result_wi_timestamp = map(add_timestamp, result_wo_timestamp) print("Length of List") print(len(result_wi_timestamp)) print("First row") print(result_wi_timestamp[0]) print("Last row") print(result_wi_timestamp[86399]) On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Hi > > pickle erros normally point to serialisation issue. i am suspecting > something wrong with ur S3 data , but is just a wild guess... > > Is your s3 object publicly available? > > few suggestions to nail down the problem > > 1 - try to see if you can read your object from s3 using boto3 library > 'offline', meaning not in a spark code > > 2 - try to replace your distributedJsonRead. instead of reading from s3, > generate a string out of a snippet of your json object > > 3 - Spark can read data from s3 as well , just do a > sc.textFile('s3://....) ==> http://www.sparktutorials. > net/reading-and-writing-s3-data-with-apache-spark. Try to se spark > entirely to read and process the data, rather than go via boto3. It adds an > extra complexity which you dont need > > If you send a snippet ofyour json content, then everyone on the list can > run the code and try to reproduce > > > hth > > Marco > > > On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.hol...@otternetworks.de> > wrote: > >> I get a slight different error when not specifying a schema: >> >> Traceback (most recent call last): >> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", >> line 61, in <module> >> df = sqlContext.createDataFrame(foo) >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", >> line 299, in createDataFrame >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >> line 520, in createDataFrame >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >> line 360, in _createFromRDD >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >> line 331, in _inferSchema >> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >> line 1328, in first >> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >> line 1310, in take >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py", >> line 941, in runJob >> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >> line 2403, in _jrdd >> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >> line 2336, in _wrap_function >> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >> line 2315, in _prepare_for_python_RDD >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", >> line 428, in dumps >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 657, in dumps >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 107, in dump >> File "/usr/lib64/python2.7/pickle.py", line 224, in dump >> self.save(obj) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple >> save(element) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 204, in save_function >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 241, in save_function_tuple >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >> save(element) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >> self._batch_appends(iter(obj)) >> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >> save(x) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 204, in save_function >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 241, in save_function_tuple >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >> save(element) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >> self._batch_appends(iter(obj)) >> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >> save(x) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 204, in save_function >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 241, in save_function_tuple >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >> save(element) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >> self._batch_appends(iter(obj)) >> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends >> save(tmp[0]) >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 198, in save_function >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >> line 246, in save_function_tuple >> File "/usr/lib64/python2.7/pickle.py", line 286, in save >> f(self, obj) # Call unbound method with explicit self >> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict >> self._batch_setitems(obj.iteritems()) >> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems >> save(v) >> File "/usr/lib64/python2.7/pickle.py", line 306, in save >> rv = reduce(self.proto) >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >> line 933, in __call__ >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", >> line 63, in deco >> File >> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >> line 316, in get_return_value >> py4j.protocol.Py4JError: An error occurred while calling >> o33.__getnewargs__. Trace: >> py4j.Py4JException: Method __getnewargs__([]) does not exist >> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) >> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) >> at py4j.Gateway.invoke(Gateway.java:272) >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> at py4j.GatewayConnection.run(GatewayConnection.java:211) >> at java.lang.Thread.run(Thread.java:745) >> >> >> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway < >> andrew.hol...@otternetworks.de> wrote: >> >>> Hi, >>> >>> Can anyone tell me what is causing this error >>> Spark 2.0.0 >>> Python 2.7.5 >>> >>> df = sqlContext.createDataFrame(foo, schema) >>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a >>> >>> Traceback (most recent call last): >>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", >>> line 61, in <module> >>> df = sqlContext.createDataFrame(foo, schema) >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", >>> line 299, in createDataFrame >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>> line 523, in createDataFrame >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2220, in _to_java_object_rdd >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2403, in _jrdd >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2336, in _wrap_function >>> File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>> line 2315, in _prepare_for_python_RDD >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", >>> line 428, in dumps >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 657, in dumps >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 107, in dump >>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump >>> self.save(obj) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 204, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 241, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) >>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>> save(x) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 204, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 241, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) >>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>> save(x) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 204, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 241, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>> save(element) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>> self._batch_appends(iter(obj)) >>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends >>> save(tmp[0]) >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 198, in save_function >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>> line 246, in save_function_tuple >>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>> f(self, obj) # Call unbound method with explicit self >>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict >>> self._batch_setitems(obj.iteritems()) >>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems >>> save(v) >>> File "/usr/lib64/python2.7/pickle.py", line 306, in save >>> rv = reduce(self.proto) >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >>> line 933, in __call__ >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", >>> line 63, in deco >>> File >>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >>> line 316, in get_return_value >>> py4j.protocol.Py4JError: An error occurred while calling >>> o33.__getnewargs__. Trace: >>> py4j.Py4JException: Method __getnewargs__([]) does not exist >>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) >>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) >>> at py4j.Gateway.invoke(Gateway.java:272) >>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at py4j.GatewayConnection.run(GatewayConnection.java:211) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> -- >>> Otter Networks UG >>> http://otternetworks.de >>> Gotenstraße 17 >>> 10829 Berlin >>> >> >> >> >> -- >> Otter Networks UG >> http://otternetworks.de >> Gotenstraße 17 >> 10829 Berlin >> > -- Otter Networks UG http://otternetworks.de Gotenstraße 17 10829 Berlin