it still does’t work… the streamingcontext could detect the new file, but it shows: ERROR dstream.FileInputDStream: File hdfs://nameservice1/sandbox/hdfs/list_join_action/2015_05_14_20_stream_1431605640.lz4 has no data in it. Spark Streaming can only ingest files that have been "moved" to the directory assigned to the file stream. Refer to the streaming programming guide for more details.
but the file indeed has many lines... > 在 2015年5月14日,下午4:00,Akhil Das <ak...@sigmoidanalytics.com> 写道: > > Here's > <https://github.com/twitter/hadoop-lzo/blob/master/src/main/java/com/hadoop/mapreduce/LzoTextInputFormat.java> > the class. You can read more here > https://github.com/twitter/hadoop-lzo#maven-repository > <https://github.com/twitter/hadoop-lzo#maven-repository> > > Thanks > Best Regards > > On Thu, May 14, 2015 at 1:22 PM, lisendong <lisend...@163.com > <mailto:lisend...@163.com>> wrote: > LzoTextInputFormat where is this class? > what is the maven dependency? > > >> 在 2015年5月14日,下午3:40,Akhil Das <ak...@sigmoidanalytics.com >> <mailto:ak...@sigmoidanalytics.com>> 写道: >> >> That's because you are using TextInputFormat i think, try with >> LzoTextInputFormat like: >> >> val list_join_action_stream = ssc.fileStream[LongWritable, Text, >> com.hadoop.mapreduce.LzoTextInputFormat](gc.input_dir, (t: Path) => true, >> false).map(_._2.toString) >> >> Thanks >> Best Regards >> >> On Thu, May 14, 2015 at 1:04 PM, lisendong <lisend...@163.com >> <mailto:lisend...@163.com>> wrote: >> I have action on DStream. >> because when I put a text file into the hdfs, it runs normally, but if I put >> a lz4 file, it does nothing. >>> 在 2015年5月14日,下午3:32,Akhil Das <ak...@sigmoidanalytics.com >>> <mailto:ak...@sigmoidanalytics.com>> 写道: >>> >>> What do you mean by not detected? may be you forgot to trigger some action >>> on the stream to get it executed. Like: >>> >>> val list_join_action_stream = ssc.fileStream[LongWritable, Text, >>> TextInputFormat](gc.input_dir, (t: Path) => true, false).map(_._2.toString) >>> >>> list_join_action_stream.count().print() >>> >>> >>> >>> Thanks >>> Best Regards >>> >>> On Wed, May 13, 2015 at 7:18 PM, hotdog <lisend...@163.com >>> <mailto:lisend...@163.com>> wrote: >>> in spark streaming, I want to use fileStream to monitor a directory. But the >>> files in that directory are compressed using lz4. So the new lz4 files are >>> not detected by the following code. How to detect these new files? >>> >>> val list_join_action_stream = ssc.fileStream[LongWritable, Text, >>> TextInputFormat](gc.input_dir, (t: Path) => true, false).map(_._2.toString) >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html >>> >>> <http://apache-spark-user-list.1001560.n3.nabble.com/how-to-read-lz4-compressed-data-using-fileStream-of-spark-streaming-tp22868.html> >>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>> <http://nabble.com/>. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> <mailto:user-unsubscr...@spark.apache.org> >>> For additional commands, e-mail: user-h...@spark.apache.org >>> <mailto:user-h...@spark.apache.org> >>> >>> >> >> > >