exactly :) thanks Harsh :D On Thu, Dec 10, 2015 at 3:18 AM, Harsh J <ha...@cloudera.com> wrote:
> > and then calling getRowID() in the lambda, because the function gets > sent to the executor right? > > Yes, that is correct (vs. a one time evaluation, as was with your > assignment earlier). > > On Thu, Dec 10, 2015 at 3:34 AM Pinela <pin...@gmail.com> wrote: > >> Hey Bryan, >> >> Thank for the answer ;) I knew it was a basic python/spark-noob thing :) >> >> this also worked >> >> *def getRowID():* >> * return datetime.now().strftime("%Y%m%d%H%M%S")* >> >> >> and then calling getRowID() in the lambda, because the function gets sent >> to the executor right? >> >> Thanks again for the quick reply :) >> >> All the best and Happy Holidays. >> Jpinela. >> >> >> >> On Wed, Dec 9, 2015 at 8:22 PM, Bryan Cutler <cutl...@gmail.com> wrote: >> >>> rowid from your code is a variable in the driver, so it will be >>> evaluated once and then only the value is sent to words.map. You probably >>> want to have rowid be a lambda itself, so that it will get the value at the >>> time it is evaluated. For example if I have the following: >>> >>> >>> data = sc.parallelize([1,2,3]) >>> >>> from datetime import datetime >>> >>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S") >>> >>> data.map(lambda x: (rowid(), x)) >>> >>> mdata = data.map(lambda x: (rowid(), x)) >>> >>> mdata.collect() >>> [('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)] >>> >>> mdata.collect() >>> [('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)] >>> >>> here rowid is evaluated whenever an action is called on the RDD, i.e. >>> collect >>> >>> On Wed, Dec 9, 2015 at 10:23 AM, jpinela <pin...@gmail.com> wrote: >>> >>>> Hi Guys, >>>> I am sure this is a simple question, but I can't find it in the docs >>>> anywhere. >>>> This reads from flume and writes to hbase (as you can see). >>>> But has a variable scope problem (I believe). >>>> I have the following code: >>>> >>>> * >>>> from pyspark.streaming import StreamingContext >>>> from pyspark.streaming.flume import FlumeUtils >>>> from datetime import datetime >>>> ssc = StreamingContext(sc, 5) >>>> conf = {"hbase.zookeeper.quorum": "ubuntu3", >>>> "hbase.mapred.outputtable": "teste2", >>>> "mapreduce.outputformat.class": >>>> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", >>>> "mapreduce.job.output.key.class": >>>> "org.apache.hadoop.hbase.io.ImmutableBytesWritable", >>>> "mapreduce.job.output.value.class": >>>> "org.apache.hadoop.io.Writable"} >>>> >>>> >>>> keyConv = >>>> >>>> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" >>>> valueConv = >>>> "org.apache.spark.examples.pythonconverters.StringListToPutConverter" >>>> >>>> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997) >>>> words = lines.map(lambda line: line[1]) >>>> rowid = datetime.now().strftime("%Y%m%d%H%M%S") >>>> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x])) >>>> print("ok 1") >>>> outrdd.pprint() >>>> >>>> outrdd.foreachRDD(lambda x: >>>> >>>> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)) >>>> >>>> ssc.start() >>>> ssc.awaitTermination()* >>>> >>>> the issue is that the rowid variable is allways at the point that the >>>> streaming was began. >>>> How can I go around this? I tried a function, an application, nothing >>>> worked. >>>> Thank you. >>>> jp >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >>