Thank you very much for the detailed answer. I feel a little dumb asking but how would that work when using Scala (we use Spark 1.0.2)? I can not figure it out. E.g. I am having trouble using ​UnionPartition and NewHadoopPartition or even ds.values() is not an option for me (in the IDE). Do you have any Scala code that does something similar? Any help is appreciated. BTW: I am creating the dStream like this: val ds = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), f, true).map(_._2.toString)
Thanks, Markus On Tue, Feb 3, 2015 at 4:55 AM, Prannoy [via Apache Spark User List] < ml-node+s1001560n21478...@n3.nabble.com> wrote: > Hi, > > To keep processing the older file also you can use fileStream instead of > textFileStream. It has a parameter to specify to look for already present > files. > > For deleting the processed files one way is to get the list of all files > in the dStream. This can be done by using the foreachRDD api of the dStream > received from the fileStream(or textFileStream). > > Suppose the dStream is > > JavaDStream<String> jpDstream = ssc > .textFileStream("path/to/your/folder/"); > > jpDstream.print(); > > jpDstream.foreachRDD( > > new Function<JavaRDD<String>, Void>(){ > > @Override > > public Void call(JavaRDD<String> arg0) throws Exception { > > getContentHigh(arg0,ssc); > > return null; > > } > > } > > ); > > public static <U> void getContentHigh(JavaRDD<String> ds, > JavaStreamingContext ssc){ > > int lenPartition = ds.rdd().partitions().length; // this gives the number > of files the stream picked > > for(int i=0;i<lenPartition;i++) { > > UnionPartition upp = (UnionPartition) listPartitions[i]; > > NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition(); > > String fPath = npp.serializableHadoopSplit().value().toString(); > > String[] nT = tmpName.split(":"); > > String name = nT[0]; // name is the path of the file picked for > processing. the processing logic can be inside this loop. once //done you > can delete the file using the path in the variable "name" > > > } > > } > > > Thanks. > > On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] > <[hidden > email] <http:///user/SendEmail.jtp?type=node&node=21478&i=0>> wrote: > >> We are running a Spark streaming job that retrieves files from a >> directory (using textFileStream). >> One concern we are having is the case where the job is down but files are >> still being added to the directory. >> Once the job starts up again, those files are not being picked up (since >> they are not new or changed while the job is running) but we would like >> them to be processed. >> Is there a solution for that? Is there a way to keep track what files >> have been processed and can we "force" older files to be picked up? Is >> there a way to delete the processed files? >> >> Thanks! >> Markus >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html >> To start a new topic under Apache Spark User List, email [hidden email] >> <http:///user/SendEmail.jtp?type=node&node=21478&i=1> >> To unsubscribe from Apache Spark User List, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html > To unsubscribe from Spark streaming - tracking/deleting processed files, > click > here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=21444&code=Z2FudGVybUBnbWFpbC5jb218MjE0NDR8LTE4MTQ3NTI4NTM=> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21504.html Sent from the Apache Spark User List mailing list archive at Nabble.com.