After moving the setting of the parameter to SparkConf initialization instead 
of after the context is already initialized, I have it operating reliably on 
local filesystem, but not on hdfs. Are there any differences in behavior 
between these two cases I should be aware of?

I don’t usually mailinglist or exchange, so forgive me for my ignorance of 
whether this message will go horribly wrong due to formatting.

I plan to port the following code to Hadoop FS API to generalize testing to 
understand actual behavior and ensure desired behavior.

public static JavaDStream<String> 
textFileStreamIncludingExisting(JavaStreamingContext context, String path)
{
    return context.fileStream(path, LongWritable
            .class, Text.class, TextInputFormat.class, v1 -> true, 
false).map(v1 -> v1._2.toString());
}

@Test
public void testTextFileStreamIncludingExistingReadsOldFiles() throws Exception
{
    final Path testDir = Files.createTempDirectory("sparkTest");
    final ArrayList<Path> tempFiles = new ArrayList();

    // create 20 "old" files
    final int testFileNumberLimit = 20;
    for (int testFileNumber = 0; testFileNumber < testFileNumberLimit; 
testFileNumber++)
    {
        final Path testFile = Files.createTempFile(testDir, "testFile", "");
        tempFiles.add(testFile);
        final FileWriter fileWriter = new FileWriter(testFile.toFile());
        fileWriter.write("asdf");
        fileWriter.flush();
        fileWriter.close();
        for (String eachAttribute : new String[]{"basic:lastAccessTime", 
"basic:lastModifiedTime",
                "basic:creationTime"})
        { // set file dates 0 to 20 days ago
            Files.setAttribute(testFile, eachAttribute, 
FileTime.from(Instant.now().minus(Duration.ofDays
                    (testFileNumber))));
        }
    }

    final SparkConf sparkConf = new 
SparkConf().setMaster("local[1]").setAppName("test");
    sparkConf.set("spark.streaming.minRememberDuration", 
String.valueOf(Integer.MAX_VALUE));
    final JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
Durations.seconds(1));
    final JavaDStream<String> input = 
SparkUtil.textFileStreamIncludingExisting(context, String.valueOf(testDir
            .toUri()));
    // count files read
    final Accumulator<Integer> accumulator = 
context.sparkContext().accumulator(0);

    // setup async wait
    Semaphore done = new Semaphore(1);
    done.acquire();
    input.foreachRDD(new Function<JavaRDD<String>, Void>()
    {
        @Override
        public Void call(JavaRDD<String> v1) throws Exception
        {
            if (v1.count() == 0)
            {
                done.release();
            }
            accumulator.add((int) v1.count());
            return null;
        }
    });
    context.start();
    // wait for completion or 20 sec
    done.tryAcquire(20, TimeUnit.SECONDS);
    context.stop();

    assertThat(accumulator.value(), is(testFileNumberLimit));

    for (Path eachTempFile : tempFiles)
    {
        Files.deleteIfExists(eachTempFile);
    }
    Files.deleteIfExists(testDir);
}


From: Tathagata Das [mailto:t...@databricks.com]
Sent: Wednesday, July 15, 2015 00:01
To: Terry Hole
Cc: Hunter Morgan; user@spark.apache.org
Subject: Re: fileStream with old files

It was added, but its not documented publicly. I am planning to change the name 
of the conf to spark.streaming.fileStream.minRememberDuration to make it easier 
to understand

On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole 
<hujie.ea...@gmail.com<mailto:hujie.ea...@gmail.com>> wrote:
A new configuration named spark.streaming.minRememberDuration was added since 
1.2.1 to control the file stream input, the default value is 60 seconds, you 
can change this value to a large value to include older files (older than 1 
minute)

You can get the detail from this jira: 
https://issues.apache.org/jira/browse/SPARK-3276

-Terry

On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant 
<hunter.mor...@rackspace.com<mailto:hunter.mor...@rackspace.com>> wrote:
It's not as odd as it sounds. I want to ensure that long streaming job
outages can recover all the files that went into a directory while the job
was down.
I've looked at
http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
and
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
and
https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
, but all seem unhelpful.
I've tested combinations of the following:
 * fileStreams created with dumb accept-all filters
 * newFilesOnly true and false,
 * tweaking minRememberDuration to high and low values,
 * on hdfs or local directory.
The problem is that it will not read files in the directory from more than a
minute ago.
JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
Also tried with having set:
context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
"1654564"); to big/small.

Are there known limitations of the onlyNewFiles=false? Am I doing something
wrong?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


Reply via email to