rowid from your code is a variable in the driver, so it will be evaluated
once and then only the value is sent to words.map.  You probably want to
have rowid be a lambda itself, so that it will get the value at the time it
is evaluated.  For example if I have the following:

>>> data = sc.parallelize([1,2,3])
>>> from datetime import datetime
>>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S")
>>> data.map(lambda x: (rowid(), x))
>>> mdata = data.map(lambda x: (rowid(), x))
>>> mdata.collect()
[('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)]
>>> mdata.collect()
[('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)]

here rowid is evaluated whenever an action is called on the RDD, i.e.
collect

On Wed, Dec 9, 2015 at 10:23 AM, jpinela <pin...@gmail.com> wrote:

> Hi Guys,
> I am sure this is a simple question, but I can't find it in the docs
> anywhere.
> This reads from flume and writes to hbase (as you can see).
> But has a variable scope problem (I believe).
> I have the following code:
>
> *
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.flume import FlumeUtils
> from datetime import datetime
> ssc = StreamingContext(sc, 5)
> conf = {"hbase.zookeeper.quorum": "ubuntu3",
>             "hbase.mapred.outputtable": "teste2",
>             "mapreduce.outputformat.class":
> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
>             "mapreduce.job.output.key.class":
> "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
>             "mapreduce.job.output.value.class":
> "org.apache.hadoop.io.Writable"}
>
>
> keyConv =
>
> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
> valueConv =
> "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>
> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
> words = lines.map(lambda line: line[1])
> rowid = datetime.now().strftime("%Y%m%d%H%M%S")
> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
> print("ok 1")
> outrdd.pprint()
>
> outrdd.foreachRDD(lambda x:
>
> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))
>
> ssc.start()
> ssc.awaitTermination()*
>
> the issue is that the rowid variable is allways at the point that the
> streaming was began.
> How can I go around this? I tried a function, an application, nothing
> worked.
> Thank you.
> jp
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to