Thank you very much for the detailed answer. I feel a little dumb asking
but how would that work when using Scala (we use Spark 1.0.2)?
I can not figure it out. E.g. I am having trouble using ​UnionPartition
and NewHadoopPartition or even ds.values() is not an option for me (in the
IDE). Do you have any Scala code that does something similar? Any help is
appreciated.
BTW: I am creating the dStream like this:
val ds = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), f,
true).map(_._2.toString)

Thanks,
Markus

On Tue, Feb 3, 2015 at 4:55 AM, Prannoy [via Apache Spark User List] <
ml-node+s1001560n21478...@n3.nabble.com> wrote:

> Hi,
>
> To keep processing the older file also you can use fileStream instead of
> textFileStream. It has a parameter to specify to look for already present
> files.
>
> For deleting the processed files one way is to get the list of all files
> in the dStream. This can be done by using the foreachRDD api of the dStream
> received from the fileStream(or textFileStream).
>
> Suppose the dStream is
>
> JavaDStream<String> jpDstream = ssc
> .textFileStream("path/to/your/folder/");
>
> jpDstream.print();
>
>  jpDstream.foreachRDD(
>
>  new Function<JavaRDD<String>, Void>(){
>
>   @Override
>
>   public Void call(JavaRDD<String> arg0) throws Exception {
>
>   getContentHigh(arg0,ssc);
>
>   return null;
>
>   }
>
>  }
>
>  );
>
>  public static <U> void getContentHigh(JavaRDD<String> ds,
> JavaStreamingContext ssc){
>
> int lenPartition = ds.rdd().partitions().length; // this gives the number
> of files the stream picked
>
> for(int i=0;i<lenPartition;i++) {
>
>      UnionPartition upp = (UnionPartition) listPartitions[i];
>
>    NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();
>
> String fPath = npp.serializableHadoopSplit().value().toString();
>
> String[] nT =  tmpName.split(":");
>
> String name = nT[0]; // name is the path of the file picked for
> processing. the processing logic can be inside this loop. once //done you
> can delete the file using the path in the variable "name"
>
>
> }
>
> }
>
>
> Thanks.
>
> On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] 
> <[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=21478&i=0>> wrote:
>
>> We are running a Spark streaming job that retrieves files from a
>> directory (using textFileStream).
>> One concern we are having is the case where the job is down but files are
>> still being added to the directory.
>> Once the job starts up again, those files are not being picked up (since
>> they are not new or changed while the job is running) but we would like
>> them to be processed.
>> Is there a solution for that? Is there a way to keep track what files
>> have been processed and can we "force" older files to be picked up? Is
>> there a way to delete the processed files?
>>
>> Thanks!
>> Markus
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html
>>  To start a new topic under Apache Spark User List, email [hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=21478&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html
>  To unsubscribe from Spark streaming - tracking/deleting processed files, 
> click
> here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=21444&code=Z2FudGVybUBnbWFpbC5jb218MjE0NDR8LTE4MTQ3NTI4NTM=>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21504.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to