Hi Marco, I was not able to find out what was causing the problem but a "git stash" seems to have fixed it :/
Thanks for your help... :) On Mon, Nov 28, 2016 at 10:50 PM, Marco Mistroni <mmistr...@gmail.com> wrote: > Hi Andrew, > sorry but to me it seems s3 is the culprit.... > I have downloaded your json file and stored locally. Then write this > simple app (a subset of what you have in ur github, sorry i m littebit > rusty on how to create new column out of existing ones) which basically > read the json file > It's in Scala, but the Python equivaletnt shouldn't be difficult > i noticed that in your schema you forgot the timezone column.... was that > intentional? > Anyway, below the code. i ran it with Spark 2.0 and similarly 1.6... > found no issues in reading the data. If i have some time i'll try to store > your json on one of my s3 bucket and read it via spark from EC2 > > > def main(args:Array[String]) = { > import org.apache.spark.sql.types._ > import org.apache.spark.SparkContext > import org.apache.spark.SparkConf > import org.apache.spark.rdd._ > import org.apache.spark.SparkContext._ > import org.apache.spark.sql._ > > > val conf = new SparkConf().setAppName("Simple Application").setMaster(" > local") > val sc = new SparkContext(conf) > val sqlContext = new SQLContext(sc) > > // no schema > val jsonContentNoSchema = sqlContext.read.json("file:/// > c:/tmp/1973-01-11.json") > jsonContentNoSchema.printSchema() > println(s"TheJsonContent with No SChema has > ${jsonContentNoSchema.count()}") > // with schema > > val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json") > import sqlContext.implicits._ > > val schema = (new StructType).add("hour", StringType).add("month", > StringType) > .add("second", StringType).add("year", StringType) > .add("timezone", StringType).add("day", StringType) > .add("minute", StringType) > > val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema) > println(s"----- And the Json withSchema has > ${jsonContentWithSchema.count()} rows") > > > } > > hope this helps > kr > marco > > > > On Mon, Nov 28, 2016 at 2:48 PM, Andrew Holway < > andrew.hol...@otternetworks.de> wrote: > >> I extracted out the boto bits and tested in vanilla python on the nodes. >> I am pretty sure that the data from S3 is ok. I've applied a public policy >> to the bucket s3://time-waits-for-no-man. There is a publicly available >> object here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-m >> an/1973-01-11 >> >> I'm using boto because using proxies with spark and hadoop in general is >> a bit of a black art. >> >> >> [centos@hadoop002 ~]$ python s3_test.py >> object key >> 1973-01-11 >> Length of List >> 86400 >> First row >> {u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': >> u'00', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': >> u'00'} >> Last row >> {u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': >> u'59', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': >> u'59'} >> [centos@hadoop002 ~]$ cat s3_test.py >> import boto3 >> import ujson >> import arrow >> import sys >> import os >> import getpass >> >> os.environ['HTTPS_PROXY'] = 'https://webproxy:8080 >> <https://webproxy.aws.db.de:8080>' >> >> def add_timestamp(dict): >> dict['timestamp'] = arrow.get( >> int(dict['year']), >> int(dict['month']), >> int(dict['day']), >> int(dict['hour']), >> int(dict['minute']), >> int(dict['second']) >> ).timestamp >> return dict >> >> s3_list = [] >> s3 = boto3.resource('s3') >> my_bucket = s3.Bucket('time-waits-for-no-man') >> for object in my_bucket.objects.filter(Prefix='1973-01-11'): >> s3_list.append(object.key) >> >> print("object key") >> print (s3_list[0]) >> >> s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man', >> key=s3_list[0]) >> contents = s3obj.get()['Body'].read().decode() >> meow = contents.splitlines() >> result_wo_timestamp = map(ujson.loads, meow) >> result_wo_timestamp[0] >> result_wi_timestamp = map(add_timestamp, result_wo_timestamp) >> >> print("Length of List") >> print(len(result_wi_timestamp)) >> print("First row") >> print(result_wi_timestamp[0]) >> print("Last row") >> print(result_wi_timestamp[86399]) >> >> >> >> >> On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistr...@gmail.com> >> wrote: >> >>> Hi >>> >>> pickle erros normally point to serialisation issue. i am suspecting >>> something wrong with ur S3 data , but is just a wild guess... >>> >>> Is your s3 object publicly available? >>> >>> few suggestions to nail down the problem >>> >>> 1 - try to see if you can read your object from s3 using boto3 library >>> 'offline', meaning not in a spark code >>> >>> 2 - try to replace your distributedJsonRead. instead of reading from s3, >>> generate a string out of a snippet of your json object >>> >>> 3 - Spark can read data from s3 as well , just do a >>> sc.textFile('s3://....) ==> http://www.sparktutorials. >>> net/reading-and-writing-s3-data-with-apache-spark. Try to se spark >>> entirely to read and process the data, rather than go via boto3. It adds an >>> extra complexity which you dont need >>> >>> If you send a snippet ofyour json content, then everyone on the list can >>> run the code and try to reproduce >>> >>> >>> hth >>> >>> Marco >>> >>> >>> On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.hol...@otternetworks.de> >>> wrote: >>> >>>> I get a slight different error when not specifying a schema: >>>> >>>> Traceback (most recent call last): >>>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", >>>> line 61, in <module> >>>> df = sqlContext.createDataFrame(foo) >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", >>>> line 299, in createDataFrame >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>>> line 520, in createDataFrame >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>>> line 360, in _createFromRDD >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>>> line 331, in _inferSchema >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 1328, in first >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 1310, in take >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py", >>>> line 941, in runJob >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2403, in _jrdd >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2336, in _wrap_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>> line 2315, in _prepare_for_python_RDD >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", >>>> line 428, in dumps >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 657, in dumps >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 107, in dump >>>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump >>>> self.save(obj) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 204, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 241, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>> self._batch_appends(iter(obj)) >>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>>> save(x) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 204, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 241, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>> self._batch_appends(iter(obj)) >>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>>> save(x) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 204, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 241, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>> save(element) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>> self._batch_appends(iter(obj)) >>>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends >>>> save(tmp[0]) >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 198, in save_function >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>> line 246, in save_function_tuple >>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>> f(self, obj) # Call unbound method with explicit self >>>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict >>>> self._batch_setitems(obj.iteritems()) >>>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems >>>> save(v) >>>> File "/usr/lib64/python2.7/pickle.py", line 306, in save >>>> rv = reduce(self.proto) >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >>>> line 933, in __call__ >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", >>>> line 63, in deco >>>> File >>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >>>> line 316, in get_return_value >>>> py4j.protocol.Py4JError: An error occurred while calling >>>> o33.__getnewargs__. Trace: >>>> py4j.Py4JException: Method __getnewargs__([]) does not exist >>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine. >>>> java:318) >>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine. >>>> java:326) >>>> at py4j.Gateway.invoke(Gateway.java:272) >>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) >>>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>>> at py4j.GatewayConnection.run(GatewayConnection.java:211) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway < >>>> andrew.hol...@otternetworks.de> wrote: >>>> >>>>> Hi, >>>>> >>>>> Can anyone tell me what is causing this error >>>>> Spark 2.0.0 >>>>> Python 2.7.5 >>>>> >>>>> df = sqlContext.createDataFrame(foo, schema) >>>>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a >>>>> >>>>> Traceback (most recent call last): >>>>> File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", >>>>> line 61, in <module> >>>>> df = sqlContext.createDataFrame(foo, schema) >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", >>>>> line 299, in createDataFrame >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", >>>>> line 523, in createDataFrame >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>>> line 2220, in _to_java_object_rdd >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>>> line 2403, in _jrdd >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>>> line 2336, in _wrap_function >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", >>>>> line 2315, in _prepare_for_python_RDD >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", >>>>> line 428, in dumps >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 657, in dumps >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 107, in dump >>>>> File "/usr/lib64/python2.7/pickle.py", line 224, in dump >>>>> self.save(obj) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple >>>>> save(element) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 204, in save_function >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 241, in save_function_tuple >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>>> save(element) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>>> self._batch_appends(iter(obj)) >>>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>>>> save(x) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 204, in save_function >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 241, in save_function_tuple >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>>> save(element) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>>> self._batch_appends(iter(obj)) >>>>> File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends >>>>> save(x) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 204, in save_function >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 241, in save_function_tuple >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple >>>>> save(element) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 600, in save_list >>>>> self._batch_appends(iter(obj)) >>>>> File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends >>>>> save(tmp[0]) >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 198, in save_function >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", >>>>> line 246, in save_function_tuple >>>>> File "/usr/lib64/python2.7/pickle.py", line 286, in save >>>>> f(self, obj) # Call unbound method with explicit self >>>>> File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict >>>>> self._batch_setitems(obj.iteritems()) >>>>> File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems >>>>> save(v) >>>>> File "/usr/lib64/python2.7/pickle.py", line 306, in save >>>>> rv = reduce(self.proto) >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", >>>>> line 933, in __call__ >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", >>>>> line 63, in deco >>>>> File >>>>> "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", >>>>> line 316, in get_return_value >>>>> py4j.protocol.Py4JError: An error occurred while calling >>>>> o33.__getnewargs__. Trace: >>>>> py4j.Py4JException: Method __getnewargs__([]) does not exist >>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine. >>>>> java:318) >>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine. >>>>> java:326) >>>>> at py4j.Gateway.invoke(Gateway.java:272) >>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j >>>>> ava:128) >>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>>>> at py4j.GatewayConnection.run(GatewayConnection.java:211) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> >>>>> -- >>>>> Otter Networks UG >>>>> http://otternetworks.de >>>>> Gotenstraße 17 >>>>> 10829 Berlin >>>>> >>>> >>>> >>>> >>>> -- >>>> Otter Networks UG >>>> http://otternetworks.de >>>> Gotenstraße 17 >>>> 10829 Berlin >>>> >>> >> >> >> -- >> Otter Networks UG >> http://otternetworks.de >> Gotenstraße 17 >> 10829 Berlin >> > > -- Otter Networks UG http://otternetworks.de Gotenstraße 17 10829 Berlin