Re: run reduceByKey on huge data in spark

2015-06-30 Thread lisendong
hello, I ‘m using spark 1.4.2-SNAPSHOT
I ‘m running in yarn mode:-)

I wonder if the spark.shuffle.memoryFraction or spark.shuffle.manager work?
how to set these parameters...
> 在 2015年7月1日,上午1:32,Ted Yu  写道:
> 
> Which Spark release are you using ?
> 
> Are you running in standalone mode ?
> 
> Cheers
> 
> On Tue, Jun 30, 2015 at 10:03 AM, hotdog  > wrote:
> I'm running reduceByKey in spark. My program is the simplest example of
> spark:
> 
> val counts = textFile.flatMap(line => line.split(" ")).repartition(2).
>  .map(word => (word, 1))
>  .reduceByKey(_ + _, 1)
> counts.saveAsTextFile("hdfs://...")
> but it always run out of memory...
> 
> I 'm using 50 servers , 35 executors per server, 140GB memory per server.
> 
> the documents volume is : 8TB documents, 20 billion documents, 1000 billion
> words in total. and the words after reduce will be about 100 million.
> 
> I wonder how to set the configuration of spark?
> 
> I wonder what value should these parameters be?
> 
> 1. the number of the maps ? 2 for example?
> 2. the number of the reduces ? 1 for example?
> 3. others parameters?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: different result from implicit ALS with explicit ALS

2015-02-26 Thread lisendong
okay, I have brought this to the user@list

I don’t think the negative pair should be omitted…..


if the score of all of the pairs are 1.0, the result will be worse…I have tried…


Best Regards, 
Sendong Li
> 在 2015年2月26日,下午10:07,Sean Owen  写道:
> 
> Yes, I mean, do not generate a Rating for these data points. What then?
> 
> Also would you care to bring this to the user@ list? it's kind of interesting.
> 
> On Thu, Feb 26, 2015 at 2:02 PM, lisendong  wrote:
>> I set the score of ‘0’ interaction user-item pair to 0.0
>> the code is as following:
>> 
>> if (ifclick > 0) {
>>score = 1.0;
>> }
>> else {
>>score = 0.0;
>> }
>> return new Rating(user_id, photo_id, score);
>> 
>> both method use the same ratings rdd
>> 
>> because of the same random seed(1 in my case), the result is stable.
>> 
>> 
>> Best Regards,
>> Sendong Li
>> 
>> 
>> 在 2015年2月26日,下午9:53,Sean Owen  写道:
>> 
>> 
>> I see why you say that, yes.
>> 
>> Are you actually encoding the '0' interactions, or just omitting them?
>> I think you should do the latter.
>> 
>> Is the AUC stable over many runs or did you just run once?
>> 
>> On Thu, Feb 26, 2015 at 1:42 PM, lisendong  wrote:
>> 
>> Hi meng, fotero, sowen:
>> 
>> I’m using ALS with spark 1.0.0, the code should be:
>> https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
>> 
>> I think the following two method should produce the same (or near) result:
>> 
>> MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01, -1,
>> 1);
>> 
>> MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30, 30,
>> 0.01, -1, 0, 1);
>> 
>> the data I used is display log, the format of log is as following:
>> 
>> user  item  if-click
>> 
>> 
>> 
>> 
>> 
>> 
>> I use 1.0 as score for click pair, and 0 as score for non-click pair.
>> 
>> in the second method, the alpha is set to zero, so the confidence for
>> positive and negative are both 1.0 (right?)
>> 
>> I think the two method should produce similar result, but the result is :
>> the second method’s result is very bad (the AUC of the first result is 0.7,
>> but the AUC of the second result is only 0.61)
>> 
>> 
>> I could not understand why, could you help me?
>> 
>> 
>> Thank you very much!
>> 
>> Best Regards,
>> Sendong Li
>> 
>> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



different result from implicit ALS with explicit ALS

2015-02-26 Thread lisendong

I’m using ALS with spark 1.0.0, the code should be:
https://github.com/apache/spark/blob/branch-1.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

I think the following two method should produce the same (or near) result:

MatrixFactorizationModel model = ALS.train(ratings.rdd(), 30, 30, 0.01, -1,
1);
MatrixFactorizationModel model = ALS.trainImplicit(ratings.rdd(), 30, 30,
0.01, -1, 0, 1);


the data I used is display log, the format of log is as following:

user  item  if-click


I use 1.0 as score for click pair, and 0 as score for non-click pair.

 in the second method, the alpha is set to zero, so the confidence for
positive and negative are both 1.0 (right?)

I think the two method should produce similar result, but the result is : 
the second method’s result is very bad (the AUC of the first result is 0.7,
but the AUC of the second result is only 0.61)


I could not understand why, could you help me?


Thank you very much!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/different-result-from-implicit-ALS-with-explicit-ALS-tp21823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to clean shuffle write each iteration

2015-03-02 Thread lisendong
I 'm using spark als.

I set the iteration number to 30.

And in each iteration, tasks will produce nearly 1TB shuffle write.

To my surprise, this shuffle data will not be cleaned until the total job
finished, which means, I need 30TB disk to store the shuffle data.


I think after each iteration, we can delete the shuffle data before current
iteration, right?

how to do this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to clean shuffle write each iteration

2015-03-03 Thread lisendong
in  ALS, I guess all the iteration’s rdds are referenced by its next 
iteration’s rdd, so all the shuffle data will not be deleted until the als job 
finished…

I guess checkpoint could solve my problem, do you know checkpoint?

> 在 2015年3月3日,下午4:18,nitin [via Apache Spark User List] 
>  写道:
> 
> Shuffle write will be cleaned if it is not referenced by any object 
> directly/indirectly. There is a garbage collector written inside spark which 
> periodically checks for weak references to RDDs/shuffle write/broadcast and 
> deletes them. 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21889.html
>  
> 
> To unsubscribe from how to clean shuffle write each iteration, click here 
> .
> NAML 
> 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21890.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

gc time too long when using mllib als

2015-03-03 Thread lisendong
why does the gc time so long?

i 'm using als in mllib,  while the garbage collection time is too long
(about 1/3 of total time)

I have tried some measures in the "tunning spark guide", and try to set the
new generation memory, but it still does not work...





Tasks

Task Index  Task ID Status  Locality Level  ExecutorLaunch Time 
DurationGC
TimeResult Ser Time Shuffle ReadWrite Time  Shuffle Write   Errors
1   2801SUCCESS PROCESS_LOCAL   h1.zw   2015/03/03 16:35:15 8.6 min 
3.3 min 
1238.3 MB   57 ms   69.2 MB 
0   2800SUCCESS PROCESS_LOCAL   h11.zw  2015/03/03 16:35:15 6.0 min 
1.1 min 
1261.0 MB   55 ms   68.6 MB 
2   2802SUCCESS PROCESS_LOCAL   h9.zw   2015/03/03 16:35:15 5.0 min 
1.5 min 
834.4 MB60 ms   69.6 MB 
4   2804SUCCESS PROCESS_LOCAL   h4.zw   2015/03/03 16:35:15 4.4 min 
59 s689.8
MB  62 ms   71.4 MB 
3   2803SUCCESS PROCESS_LOCAL   h8.zw   2015/03/03 16:35:15 4.2 min 
1.6 min 
803.6 MB66 ms   71.5 MB 
7   2807SUCCESS PROCESS_LOCAL   h6.zw   2015/03/03 16:35:15 4.3 min 
1.4 min 
733.1 MB9 s 66.5 MB 
6   2806SUCCESS PROCESS_LOCAL   h10.zw  2015/03/03 16:35:15 6.4 min 
3.1 min 
950.5 MB68 ms   69.3 MB 
5   2805SUCCESS PROCESS_LOCAL   h3.zw   2015/03/03 16:35:15 8.0 min 
2.7 min 
1132.0 MB   64 ms   70.3 MB 
8   2808SUCCESS PROCESS_LOCAL   h12.zw  2015/03/03 16:35:15 4.5 min 
2.2 min 
1304.2 MB   60 ms   69.4 MB 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/gc-time-too-long-when-using-mllib-als-tp21891.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark.local.dir leads to "Job cancelled because SparkContext was shut down"

2015-03-03 Thread lisendong
As long as I set the "spark.local.dir" to multiple disks, the job will
failed, the errors are as follow:
(if I set the spark.local.dir to only 1 dir, the job will succed...)

Exception in thread "main" org.apache.spark.SparkException: Job cancelled
because SparkContext was shut down
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-leads-to-Job-cancelled-because-SparkContext-was-shut-down-tp21894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark master shut down suddenly

2015-03-04 Thread lisendong
15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
connection and attempting reconnect
15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership
15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
shutting down.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to update als in mllib?

2015-03-04 Thread lisendong
I 'm using spark1.0.0 with cloudera.

but I want to use new als code which supports more features, such as rdd
cache level(MEMORY ONLY), checkpoint, and so on.

What is the easiest way to use the new als code?

I only need the mllib als code, so maybe I don't need to update all the
spark & mllib  of the cluster machines...

maybe I download a new spark jar, and include it in my driver is enough?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-update-als-in-mllib-tp21921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark master shut down suddenly

2015-03-04 Thread lisendong
I ‘m sorry, but how to look at the mesos logs?
where are them?



> 在 2015年3月4日,下午6:06,Akhil Das  写道:
> 
> You can check in the mesos logs and see whats really happening.
> 
> Thanks
> Best Regards
> 
> On Wed, Mar 4, 2015 at 3:10 PM, lisendong  <mailto:lisend...@163.com>> wrote:
> 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
> from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
> connection and attempting reconnect
> 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
> 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership
> 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
> shutting down.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



why my YoungGen GC takes so long time?

2015-03-05 Thread lisendong
I found my task takes so long time for YoungGen GC, I set the young gen size
to about 1.5G, I wonder why it takes so long time?
not all the tasks take such long time, only about 1% tasks so long...


180.426: [GC [PSYoungGen: 9916105K->1676785K(14256640K)]
26201020K->18690057K(53403648K), 17.3581500 secs] [Times: user=104.48
sys=152.56, real=17.36 secs] 
198.986: [GC [PSYoungGen: 10724337K->1664837K(14881280K)]
27737609K->20205003K(54028288K), 55.0331460 secs] [Times: user=47.67
sys=776.67, real=55.02 secs] 
255.339: [GC [PSYoungGen: 11605317K->1550792K(14632960K)]
30145483K->21580959K(53779968K), 187.7893060 secs] [Times: user=110.10
sys=2704.33, real=187.76 secs] 
444.569: [GC [PSYoungGen: 11491272K->988109K(15197696K)]
31521439K->22370228K(54344704K), 366.2677820 secs] [Times: user=78.76
sys=5193.95, real=366.21 secs] 
812.450: [GC [PSYoungGen: 11704781K->1087351K(15092736K)]
33086900K->23295623K(54239744K), 163.0328770 secs] [Times: user=97.71
sys=2134.11, real=163.01 secs] 
977.207: [GC [PSYoungGen: 11804023K->1058228K(15470592K)]
34012295K->24294801K(54617600K), 176.5011980 secs] [Times: user=106.06
sys=1700.11, real=176.47 secs] 
1155.439: [GC [PSYoungGen: 12288948K->1248832K(15333888K)]
35525521K->25495802K(54480896K), 123.5496760 secs] [Times: user=59.09
sys=940.54, real=123.53 secs] 
1280.796: [GC [PSYoungGen: 12479552K->1097796K(15785472K)]
36726522K->26527988K(54932480K), 157.1727550 secs] [Times: user=11.14
sys=1376.93, real=157.15 secs] 
1439.923: [GC [PSYoungGen: 12953668K->1178863K(15644160K)]
38383860K->27637944K(54791168K), 119.3690200 secs] [Times: user=107.54
sys=937.40, real=119.35 secs] 
1561.236: [GC [PSYoungGen: 13034735K->1190849K(16093696K)]
39493816K->28787207K(55240704K), 159.1091770 secs] [Times: user=237.16
sys=1653.59, real=159.08 secs] 
1722.406: [GC [PSYoungGen: 13651905K->1245193K(15941120K)]
41248263K->29995801K(55088128K), 168.0231330 secs] [Times: user=81.20
sys=2203.72, real=168.00 secs] 
1892.731: [GC [PSYoungGen: 13706249K->1161132K(16381952K)]
42456857K->31117445K(55528960K), 162.0598100 secs] [Times: user=126.67
sys=1689.32, real=162.04 secs] 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-my-YoungGen-GC-takes-so-long-time-tp21941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



what are the types of tasks when running ALS iterations

2015-03-08 Thread lisendong
you see, the core of ALS 1.0.0 is the following code:
there should be flatMap and groupByKey when running ALS iterations , right?
but when I run als iteration, there are ONLY flatMap tasks...
do you know why? 

 private def updateFeatures(
  products: RDD[(Int, Array[Array[Double]])],
  productOutLinks: RDD[(Int, OutLinkBlock)],
  userInLinks: RDD[(Int, InLinkBlock)],
  partitioner: Partitioner,
  rank: Int,
  lambda: Double,
  alpha: Double,
  YtY: Option[Broadcast[DoubleMatrix]])
  : RDD[(Int, Array[Array[Double]])] =
  {
val numBlocks = products.partitions.size
productOutLinks.join(products).flatMap { case (bid, (outLinkBlock,
factors)) =>
  val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
  for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until
numBlocks) {
if (outLinkBlock.shouldSend(p)(userBlock)) {
  toSend(userBlock) += factors(p)
}
  }
  toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray))
}
}.groupByKey(new HashPartitioner(numBlocks)) //这里1.0.0 的
als代码有bug,那个版本用的是传入的partitioner,起不到作用,会导致data skew
  .join(userInLinks)
  .mapValues{ case (messages, inLinkBlock) =>
  updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
}
  }




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-types-of-tasks-when-running-ALS-iterations-tp21966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
I have update my spark source code to 1.3.1.

the checkpoint works well. 

BUT the shuffle data still could not be delete automatically…the disk usage is 
still 30TB…

I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.

Do you know how to solve my problem?

Sendong Li



> 在 2015年3月31日,上午12:11,Xiangrui Meng  写道:
> 
> setCheckpointInterval was added in the current master and branch-1.3. Please 
> help check whether it works. It will be included in the 1.3.1 and 1.4.0 
> release. -Xiangrui
> 
> On Mon, Mar 30, 2015 at 7:27 AM, lisendong  <mailto:lisend...@163.com>> wrote:
> hi, xiangrui:
> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
> the code is :
> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>  
> <https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala>
> 
> 
> the checkpoint is very important in my situation, because my task will 
> produce 1TB shuffle data in each iteration, it the shuffle data is not 
> deleted in each iteration(using checkpoint()), the task will produce 30TB 
> data…
> 
> 
> So I change the ALS code, and re-compile by myself, but it seems the 
> checkpoint does not take effects, and the task still occupy 30TB disk… ( I 
> only add two lines to the ALS.scala) :
> 
> 
> 
> 
> 
> and the driver’s log seems strange, why the log is printed together...
> 
> 
> thank you very much!
> 
> 
>> 在 2015年2月26日,下午11:33,163 mailto:lisend...@163.com>> 写道:
>> 
>> Thank you very much for your opinion:)
>> 
>> In our case, maybe it 's dangerous to treat un-observed item as negative 
>> interaction(although we could give them small confidence, I think they are 
>> still incredible...)
>> 
>> I will do more experiments and give you feedback:)
>> 
>> Thank you;)
>> 
>> 
>>> 在 2015年2月26日,23:16,Sean Owen >> <mailto:so...@cloudera.com>> 写道:
>>> 
>>> I believe that's right, and is what I was getting at. yes the implicit
>>> formulation ends up implicitly including every possible interaction in
>>> its loss function, even unobserved ones. That could be the difference.
>>> 
>>> This is mostly an academic question though. In practice, you have
>>> click-like data and should be using the implicit version for sure.
>>> 
>>> However you can give negative implicit feedback to the model. You
>>> could consider no-click as a mild, observed, negative interaction.
>>> That is: supply a small negative value for these cases. Unobserved
>>> pairs are not part of the data set. I'd be careful about assuming the
>>> lack of an action carries signal.
>>> 
>>>> On Thu, Feb 26, 2015 at 3:07 PM, 163 >>> <mailto:lisend...@163.com>> wrote:
>>>> oh my god, I think I understood...
>>>> In my case, there are three kinds of user-item pairs:
>>>> 
>>>> Display and click pair(positive pair)
>>>> Display but no-click pair(negative pair)
>>>> No-display pair(unobserved pair)
>>>> 
>>>> Explicit ALS only consider the first and the second kinds
>>>> But implicit ALS consider all the three kinds of pair(and consider the 
>>>> third
>>>> kind as the second pair, because their preference value are all zero and
>>>> confidence are all 1)
>>>> 
>>>> So the result are different. right?
>>>> 
>>>> Could you please give me some advice, which ALS should I use?
>>>> If I use the implicit ALS, how to distinguish the second and the third kind
>>>> of pair:)
>>>> 
>>>> My opinion is in my case, I should use explicit ALS ...
>>>> 
>>>> Thank you so much
>>>> 
>>>> 在 2015年2月26日,22:41,Xiangrui Meng >>> <mailto:m...@databricks.com>> 写道:
>>>> 
>>>> Lisen, did you use all m-by-n pairs during training? Implicit model
>>>> penalizes unobserved ratings, while explicit model doesn't. -Xiangrui
>>>> 
>>>>> On Feb 26, 2015 6:26 AM, "Sean Owen" >>>> <mailto:so...@cloudera.com>> wrote:
>>>>> 
>>>>> +user
>>>>> 
>>>>>> On Thu, Feb 26, 2015 at 2:26 PM, Sean Owen >>>>> <mailto:so...@cloudera.com>> wrote:
>>>>>> 
>>>>>> I t

Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
Thank you, @GuoQiang
I will try to add runGC() to the ALS.scala, and if it works for deleting the 
shuffle data, I will tell you :-)



> ?? 2015??3??314:47??GuoQiang Li  ??
> 
> You can try to enforce garbage collection:
> 
> /** Run GC and make sure it actually has run */
> def runGC() {
>   val weakRef = new WeakReference(new Object())
>   val startTime = System.currentTimeMillis
>   System.gc() // Make a best effort to run the garbage collection. It 
> *usually* runs GC.
>   // Wait until a weak reference object has been GCed
>   System.runFinalization()
>   while (weakRef.get != null) {
> System.gc()
> System.runFinalization()
> Thread.sleep(200)
> if (System.currentTimeMillis - startTime > 1) {
>   throw new Exception("automatically cleanup error")
> }
>   }
> }
> 
> 
> --  --
> ??: "lisendong"mailto:lisend...@163.com>>; 
> : 2015??3??31??(??) 3:47
> ??: "Xiangrui Meng"mailto:men...@gmail.com>>; 
> : "Xiangrui Meng"mailto:m...@databricks.com>>; 
> "user"mailto:user@spark.apache.org>>; "Sean 
> Owen"mailto:so...@cloudera.com>>; "GuoQiang 
> Li"mailto:wi...@qq.com>>; 
> : Re: different result from implicit ALS with explicit ALS
> 
> I have update my spark source code to 1.3.1.
> 
> the checkpoint works well. 
> 
> BUT the shuffle data still could not be delete automatically??the disk usage 
> is still 30TB??
> 
> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
> 
> Do you know how to solve my problem?
> 
> Sendong Li
> 
> 
> 
>> ?? 2015??3??3112:11??Xiangrui Meng > <mailto:men...@gmail.com>> ??
>> 
>> setCheckpointInterval was added in the current master and branch-1.3. Please 
>> help check whether it works. It will be included in the 1.3.1 and 1.4.0 
>> release. -Xiangrui
>> 
>> On Mon, Mar 30, 2015 at 7:27 AM, lisendong > <mailto:lisend...@163.com>> wrote:
>> hi, xiangrui:
>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>> the code is :
>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>>  
>> <https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala>
>> 
>> 
>> the checkpoint is very important in my situation, because my task will 
>> produce 1TB shuffle data in each iteration, it the shuffle data is not 
>> deleted in each iteration(using checkpoint()), the task will produce 30TB 
>> data??
>> 
>> 
>> So I change the ALS code, and re-compile by myself, but it seems the 
>> checkpoint does not take effects, and the task still occupy 30TB disk?? ( I 
>> only add two lines to the ALS.scala) :
>> 
>> 
>> 
>> 
>> 
>> and the driver??s log seems strange, why the log is printed together...
>> 
>> 
>> thank you very much!
>> 
>> 
>>> ?? 2015??2??2611:33??163 >> <mailto:lisend...@163.com>> ??
>>> 
>>> Thank you very much for your opinion:)
>>> 
>>> In our case, maybe it 's dangerous to treat un-observed item as negative 
>>> interaction(although we could give them small confidence, I think they are 
>>> still incredible...)
>>> 
>>> I will do more experiments and give you feedback:)
>>> 
>>> Thank you;)
>>> 
>>> 
>>>> ?? 2015??2??2623:16??Sean Owen >>> <mailto:so...@cloudera.com>> ??
>>>> 
>>>> I believe that's right, and is what I was getting at. yes the implicit
>>>> formulation ends up implicitly including every possible interaction in
>>>> its loss function, even unobserved ones. That could be the difference.
>>>> 
>>>> This is mostly an academic question though. In practice, you have
>>>> click-like data and should be using the implicit version for sure.
>>>> 
>>>> However you can give negative implicit feedback to the model. You
>>>> could consider no-click as a mild, observed, negative interaction.
>>>> That is: supply a small negative value for these cases. Unobserved
>>>> pairs are not part of the data set. I'd be careful about assuming the
>>>> lack of an action carries signal.
>>>> 
>>>

Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
guoqiang ??s method works very well ??

it only takes 1TB disk now.

thank you very much!



> ?? 2015??3??314:47??GuoQiang Li  ??
> 
> You can try to enforce garbage collection:
> 
> /** Run GC and make sure it actually has run */
> def runGC() {
>   val weakRef = new WeakReference(new Object())
>   val startTime = System.currentTimeMillis
>   System.gc() // Make a best effort to run the garbage collection. It 
> *usually* runs GC.
>   // Wait until a weak reference object has been GCed
>   System.runFinalization()
>   while (weakRef.get != null) {
> System.gc()
> System.runFinalization()
> Thread.sleep(200)
> if (System.currentTimeMillis - startTime > 1) {
>   throw new Exception("automatically cleanup error")
> }
>   }
> }
> 
> 
> --  --
> ??: "lisendong"mailto:lisend...@163.com>>; 
> : 2015??3??31??(??) 3:47
> ??: "Xiangrui Meng"mailto:men...@gmail.com>>; 
> : "Xiangrui Meng"mailto:m...@databricks.com>>; 
> "user"mailto:user@spark.apache.org>>; "Sean 
> Owen"mailto:so...@cloudera.com>>; "GuoQiang 
> Li"mailto:wi...@qq.com>>; 
> : Re: different result from implicit ALS with explicit ALS
> 
> I have update my spark source code to 1.3.1.
> 
> the checkpoint works well. 
> 
> BUT the shuffle data still could not be delete automatically??the disk usage 
> is still 30TB??
> 
> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
> 
> Do you know how to solve my problem?
> 
> Sendong Li
> 
> 
> 
>> ?? 2015??3??3112:11??Xiangrui Meng > <mailto:men...@gmail.com>> ??
>> 
>> setCheckpointInterval was added in the current master and branch-1.3. Please 
>> help check whether it works. It will be included in the 1.3.1 and 1.4.0 
>> release. -Xiangrui
>> 
>> On Mon, Mar 30, 2015 at 7:27 AM, lisendong > <mailto:lisend...@163.com>> wrote:
>> hi, xiangrui:
>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>> the code is :
>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>>  
>> <https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala>
>> 
>> 
>> the checkpoint is very important in my situation, because my task will 
>> produce 1TB shuffle data in each iteration, it the shuffle data is not 
>> deleted in each iteration(using checkpoint()), the task will produce 30TB 
>> data??
>> 
>> 
>> So I change the ALS code, and re-compile by myself, but it seems the 
>> checkpoint does not take effects, and the task still occupy 30TB disk?? ( I 
>> only add two lines to the ALS.scala) :
>> 
>> 
>> 
>> 
>> 
>> and the driver??s log seems strange, why the log is printed together...
>> 
>> 
>> thank you very much!
>> 
>> 
>>> ?? 2015??2??2611:33??163 >> <mailto:lisend...@163.com>> ??
>>> 
>>> Thank you very much for your opinion:)
>>> 
>>> In our case, maybe it 's dangerous to treat un-observed item as negative 
>>> interaction(although we could give them small confidence, I think they are 
>>> still incredible...)
>>> 
>>> I will do more experiments and give you feedback:)
>>> 
>>> Thank you;)
>>> 
>>> 
>>>> ?? 2015??2??2623:16??Sean Owen >>> <mailto:so...@cloudera.com>> ??
>>>> 
>>>> I believe that's right, and is what I was getting at. yes the implicit
>>>> formulation ends up implicitly including every possible interaction in
>>>> its loss function, even unobserved ones. That could be the difference.
>>>> 
>>>> This is mostly an academic question though. In practice, you have
>>>> click-like data and should be using the implicit version for sure.
>>>> 
>>>> However you can give negative implicit feedback to the model. You
>>>> could consider no-click as a mild, observed, negative interaction.
>>>> That is: supply a small negative value for these cases. Unobserved
>>>> pairs are not part of the data set. I'd be careful about assuming the
>>>> lack of an action carries signal.
>>>> 
>>>>> On Thu, Feb 26, 2015 at

Re: different result from implicit ALS with explicit ALS

2015-03-31 Thread lisendong
In my experiment, if I do not call gc() explicitly, the shuffle files will not 
be cleaned until the whole job finish… I don’t know why, maybe the rdd could 
not be GCed implicitly.
In my situation, a full gc in driver takes about 10 seconds, so I start a 
thread in driver to do GC  like this : (do GC every 120 seconds)

while (true) {
System.gc();
Thread.sleep(120 * 1000);
}


it works well now.
Do you have more elegant ways to clean the shuffle files?

Best Regards,
Sendong Li



> 在 2015年4月1日,上午5:09,Xiangrui Meng  写道:
> 
> Hey Guoqiang and Sendong,
> 
> Could you comment on the overhead of calling gc() explicitly? The shuffle 
> files should get cleaned in a few seconds after checkpointing, but it is 
> certainly possible to accumulates TBs of files in a few seconds. In this 
> case, calling gc() may work the same as waiting for a few seconds after each 
> checkpoint. Is it correct?
> 
> Best,
> Xiangrui
> 
> On Tue, Mar 31, 2015 at 8:58 AM, lisendong  <mailto:lisend...@163.com>> wrote:
> guoqiang ’s method works very well …
> 
> it only takes 1TB disk now.
> 
> thank you very much!
> 
> 
> 
>> 在 2015年3月31日,下午4:47,GuoQiang Li mailto:wi...@qq.com>> 写道:
>> 
>> You can try to enforce garbage collection:
>> 
>> /** Run GC and make sure it actually has run */
>> def runGC() {
>>   val weakRef = new WeakReference(new Object())
>>   val startTime = System.currentTimeMillis
>>   System.gc() // Make a best effort to run the garbage collection. It 
>> *usually* runs GC.
>>   // Wait until a weak reference object has been GCed
>>   System.runFinalization()
>>   while (weakRef.get != null) {
>> System.gc()
>> System.runFinalization()
>> Thread.sleep(200)
>> if (System.currentTimeMillis - startTime > 1) {
>>   throw new Exception("automatically cleanup error")
>> }
>>   }
>> }
>> 
>> 
>> -- 原始邮件 --
>> 发件人: "lisendong"mailto:lisend...@163.com>>; 
>> 发送时间: 2015年3月31日(星期二) 下午3:47
>> 收件人: "Xiangrui Meng"mailto:men...@gmail.com>>; 
>> 抄送: "Xiangrui Meng"mailto:m...@databricks.com>>; 
>> "user"mailto:user@spark.apache.org>>; "Sean 
>> Owen"mailto:so...@cloudera.com>>; "GuoQiang 
>> Li"mailto:wi...@qq.com>>; 
>> 主题: Re: different result from implicit ALS with explicit ALS
>> 
>> I have update my spark source code to 1.3.1.
>> 
>> the checkpoint works well. 
>> 
>> BUT the shuffle data still could not be delete automatically…the disk usage 
>> is still 30TB…
>> 
>> I have set the spark.cleaner.referenceTracking.blocking.shuffle to true.
>> 
>> Do you know how to solve my problem?
>> 
>> Sendong Li
>> 
>> 
>> 
>>> 在 2015年3月31日,上午12:11,Xiangrui Meng >> <mailto:men...@gmail.com>> 写道:
>>> 
>>> setCheckpointInterval was added in the current master and branch-1.3. 
>>> Please help check whether it works. It will be included in the 1.3.1 and 
>>> 1.4.0 release. -Xiangrui
>>> 
>>> On Mon, Mar 30, 2015 at 7:27 AM, lisendong >> <mailto:lisend...@163.com>> wrote:
>>> hi, xiangrui:
>>> I found the ALS of spark 1.3.0 forget to do checkpoint() in explicit ALS:
>>> the code is :
>>> https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
>>>  
>>> <https://github.com/apache/spark/blob/db34690466d67f9c8ac6a145fddb5f7ea30a8d8d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala>
>>> 
>>> 
>>> the checkpoint is very important in my situation, because my task will 
>>> produce 1TB shuffle data in each iteration, it the shuffle data is not 
>>> deleted in each iteration(using checkpoint()), the task will produce 30TB 
>>> data…
>>> 
>>> 
>>> So I change the ALS code, and re-compile by myself, but it seems the 
>>> checkpoint does not take effects, and the task still occupy 30TB disk… ( I 
>>> only add two lines to the ALS.scala) :
>>> 
>>> 
>>> 
>>> 
>>> 
>>> and the driver’s log seems strange, why the log is printed together...
>>> 
>>> 
>>> thank you very much!
>>> 
>>> 
>>>> 在 2015年2月26日,下午11:33,163 mailto:lisend...@163.com>> 写道:
>>>> 
>>>> Thank you very much for your opinion:)
>>>> 
>>>> In our case

Re: there are about 50% all-zero vector in the als result

2015-04-02 Thread lisendong
NO, I’m referring to the result.
you means there might be so many zero features in the als result ?

I think it is not related to the initial state, but I do not know why the 
percent of zero-vector  is so high(50% around)


> 在 2015年4月2日,下午6:08,Sean Owen  写道:
> 
> You're referring to the initialization, not the result, right? It's possible 
> that the resulting weight vectors are sparse although this looks surprising 
> to me. But it is not related to the initial state, right?
> 
> On Thu, Apr 2, 2015 at 10:43 AM, lisendong  <mailto:lisend...@163.com>> wrote:
> I found that there are about 50% all-zero vectors in the ALS result ( both in 
> userFeatures and productFeatures)
> 
> the result looks like this:
> 
> 
> 
> 
> I looked into the ALS.scala, the user and product factors seems to be 
> initialized by a gaussian distribution, so it should not be all-zero vector, 
> right?
> 
> 
>  邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
> 共有 1 个附件
> PastedGraphic-1.tiff(607K)
> 极速下载 
> <http://preview.mail.163.com/xdownload?filename=PastedGraphic-1.tiff&mid=1tbiDQnPDVQG6QKGSwAAsy&part=3&sign=60d32e9f90d3dd36c5858328dd96eabe&time=1427969402&uid=lisendong%40163.com>


Re: there are about 50% all-zero vector in the als result

2015-04-02 Thread lisendong
Oh, I found the reason.
according to the ALS optimization  formula :

If a user’s all ratings are zero, that is,  the R(i, Ii) is a zero matrix, so 
the final result feature of this user will be all-zero vector…


> 在 2015年4月2日,下午6:08,Sean Owen  写道:
> 
> You're referring to the initialization, not the result, right? It's possible 
> that the resulting weight vectors are sparse although this looks surprising 
> to me. But it is not related to the initial state, right?
> 
> On Thu, Apr 2, 2015 at 10:43 AM, lisendong  <mailto:lisend...@163.com>> wrote:
> I found that there are about 50% all-zero vectors in the ALS result ( both in 
> userFeatures and productFeatures)
> 
> the result looks like this:
> 
> 
> 
> 
> I looked into the ALS.scala, the user and product factors seems to be 
> initialized by a gaussian distribution, so it should not be all-zero vector, 
> right?
> 
> 
>  邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。
> 共有 1 个附件
> PastedGraphic-1.tiff(607K)
> 极速下载 
> <http://preview.mail.163.com/xdownload?filename=PastedGraphic-1.tiff&mid=1tbiDQnPDVQG6QKGSwAAsy&part=3&sign=60d32e9f90d3dd36c5858328dd96eabe&time=1427969402&uid=lisendong%40163.com>


Re: there are about 50% all-zero vector in the als result

2015-04-02 Thread lisendong
yes! 
thank you very much:-)

> 在 2015年4月2日,下午7:13,Sean Owen  写道:
> 
> Right, I asked because in your original message, you were looking at
> the initialization to a random vector. But that is the initial state,
> not final state.
> 
> On Thu, Apr 2, 2015 at 11:51 AM, lisendong  wrote:
>> NO, I’m referring to the result.
>> you means there might be so many zero features in the als result ?
>> 
>> I think it is not related to the initial state, but I do not know why the
>> percent of zero-vector  is so high(50% around)
> 
>>> I looked into the ALS.scala, the user and product factors seems to be
>>> initialized by a gaussian distribution, so it should not be all-zero vector,
>>> right?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



union eatch streaming window into a static rdd and use the static rdd periodicity

2015-05-06 Thread lisendong
the pseudo code :

object myApp {
  var myStaticRDD: RDD[Int]
  def main() {
  ...  //init streaming context, and get two DStream (streamA and streamB)
from two hdfs path

  //complex transformation using the two DStream
  val new_stream = streamA.transformWith(StreamB, (a, b, t) => {
  a.join(b).map(...)
}
  )
  
  //join the new_stream's rdd with myStaticRDD
  new_stream.foreachRDD(rdd =>
myStaticRDD = myStaticRDD.join(cur_stream)
  )

  // do complex model training every two hours.
  if (hour is 0, 2, 4, 6...) {
 model_training(myStaticRDD)   //will take 1 hour
  }
  }
}


I don't know how to write the code to realize training model every two hours
using that moment's myStaticRDD.
And when the model-training is running, the streaming task could also run
normally simultaneously...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/union-eatch-streaming-window-into-a-static-rdd-and-use-the-static-rdd-periodicity-tp22783.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to load some of the files in a dir and monitor new file in that dir in spark streaming without missing?

2015-05-11 Thread lisendong

I have one hdfs dir, which contains many files:

/user/root/1.txt
/user/root/2.txt
/user/root/3.txt
/user/root/4.txt


and there is a daemon process which add one file per minute to this dir.
(e.g., 5.txt, 6.txt, 7.txt...)

I want to start a spark streaming job which load 3.txt, 4.txt and then
detect all the new files after 4.txt.

Please pay attention that because these files are large, processing these
files will take a long time. So if I process 3.txt and 4.txt before
launching the streaming task, maybe the 5.txt, 6.txt will be produced into
this dir during processing 3.txt and 4.txt. And when the streaming task
start, 5.txt and 6.txt will be missed for processing because it will only
process from new file(from 7.txt)

I'm not sure if I describe the problem clearly, if you have any question,
please ask me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-load-some-of-the-files-in-a-dir-and-monitor-new-file-in-that-dir-in-spark-streaming-without-m-tp22841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to monitor multi directories in spark streaming task

2015-05-13 Thread lisendong
but in fact the directories are not ready at the beginning to my task .

for example:

/user/root/2015/05/11/data.txt
/user/root/2015/05/12/data.txt
/user/root/2015/05/13/data.txt

like this. 

and one new directory one day.

how to create the new DStream for tomorrow’s new 
directory(/user/root/2015/05/13/) ??


> 在 2015年5月13日,下午4:59,Ankur Chauhan  写道:
> 
> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA1
> 
> I would suggest creating one DStream per directory and then using
> StreamingContext#union(...) to get a union DStream.
> 
> - -- Ankur
> 
> On 13/05/2015 00:53, hotdog wrote:
>> I want to use use fileStream in spark streaming to monitor multi
>> hdfs directories, such as:
>> 
>> val list_join_action_stream = ssc.fileStream[LongWritable, Text, 
>> TextInputFormat]("/user/root/*/*", check_valid_file(_), 
>> false).map(_._2.toString).print
>> 
>> 
>> Buy the way, i could not under the meaning of the three class : 
>> LongWritable, Text, TextInputFormat
>> 
>> but it doesn't work...
>> 
>> 
>> 
>> -- View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-monitor-mul
> ti-directories-in-spark-streaming-task-tp22863.html
>> 
>> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> 
>> 
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> -BEGIN PGP SIGNATURE-
> 
> iQEcBAEBAgAGBQJVUxJ1AAoJEOSJAMhvLp3L2f4IAKK+ouQ2VD7H6s/5w/YGbt2P
> uBGJPQ92Hb5REq3f4gK4YecygtAlSAwsqXGCoAaaoPAC7vUMs9RM+slqse1gmUPU
> pbORTIB9dv3iVxjPtZ6R8EX14BAlxcIOR6ni2RBHuQTL+dgIEUekmCg0IhFa5lVF
> Kt5in8rY5PSnX5l/dX9Yu8LI3uC4TLQ+eJXjjOGXoCHys+SaZWJckA3gVcF9GQdB
> dwdhv4UCIYVFj3QIVlLf0+B8FgA0DnRfBC+5ZfS88gcWMc4065sDdx5LkySy4oZB
> tB8IpC4yaY3Mqiu8jdvhcw+SevlYan5YkkkutSvKH7nL/0d1WIkEkHxPBjRqAmY=
> =U0oQ
> -END PGP SIGNATURE-
> [attachment]
> 
> 0x6D461C4A.asc
> download: http://u.163.com/t0/fqZhSPbA
> 
> preview: http://u.163.com/t0/2LRiaRy
> 
> 
> 0x6D461C4A.asc.sig
> download: http://u.163.com/t0/Ij1N9
> 
> <0x6D461C4A.asc><0x6D461C4A.asc.sig>



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to read lz4 compressed data using fileStream of spark streaming?

2015-05-14 Thread lisendong
it still does’t work…
the streamingcontext could detect the new file, but it shows:
ERROR dstream.FileInputDStream: File 
hdfs://nameservice1/sandbox/hdfs/list_join_action/2015_05_14_20_stream_1431605640.lz4
 has no data in it. Spark Streaming can only ingest files that have been 
"moved" to the directory assigned to the file stream. Refer to the streaming 
programming guide for more details.

but the file indeed has many lines...

> 在 2015年5月14日,下午4:00,Akhil Das  写道:
> 
> Here's 
> <https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java>
>  the class. You can read more here 
> https://github.com/twitter/hadoop-lzo#maven-repository 
> <https://github.com/twitter/hadoop-lzo#maven-repository>
> 
> Thanks
> Best Regards
> 
> On Thu, May 14, 2015 at 1:22 PM, lisendong  <mailto:lisend...@163.com>> wrote:
> LzoTextInputFormat where is this class? 
> what is the maven dependency?
> 
> 
>> 在 2015年5月14日,下午3:40,Akhil Das > <mailto:ak...@sigmoidanalytics.com>> 写道:
>> 
>> That's because you are using TextInputFormat i think, try with 
>> LzoTextInputFormat like:
>> 
>> val list_join_action_stream = ssc.fileStream[LongWritable, Text,
>> com.hadoop.mapreduce.LzoTextInputFormat](gc.input_dir, (t: Path) => true, 
>> false).map(_._2.toString)
>> 
>> Thanks
>> Best Regards
>> 
>> On Thu, May 14, 2015 at 1:04 PM, lisendong > <mailto:lisend...@163.com>> wrote:
>> I have action on DStream.
>> because when I put a text file into the hdfs, it runs normally, but if I put 
>> a lz4 file, it does nothing.
>>> 在 2015年5月14日,下午3:32,Akhil Das >> <mailto:ak...@sigmoidanalytics.com>> 写道:
>>> 
>>> What do you mean by not detected? may be you forgot to trigger some action 
>>> on the stream to get it executed. Like:
>>> 
>>> val list_join_action_stream = ssc.fileStream[LongWritable, Text,
>>> TextInputFormat](gc.input_dir, (t: Path) => true, false).map(_._2.toString)
>>> 
>>> list_join_action_stream.count().print()
>>> 
>>> 
>>> 
>>> Thanks
>>> Best Regards
>>> 
>>> On Wed, May 13, 2015 at 7:18 PM, hotdog >> <mailto:lisend...@163.com>> wrote:
>>> in spark streaming, I want to use fileStream to monitor a directory. But the
>>> files in that directory are compressed using lz4. So the new lz4 files are
>>> not detected by the following code. How to detect these new files?
>>> 
>>> val list_join_action_stream = ssc.fileStream[LongWritable, Text,
>>> TextInputFormat](gc.input_dir, (t: Path) => true, false).map(_._2.toString)
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html
>>>  
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html>
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>>> <http://nabble.com/>.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> <mailto:user-h...@spark.apache.org>
>>> 
>>> 
>> 
>> 
> 
>