After moving the setting of the parameter to SparkConf initialization instead of after the context is already initialized, I have it operating reliably on local filesystem, but not on hdfs. Are there any differences in behavior between these two cases I should be aware of?
I don’t usually mailinglist or exchange, so forgive me for my ignorance of whether this message will go horribly wrong due to formatting. I plan to port the following code to Hadoop FS API to generalize testing to understand actual behavior and ensure desired behavior. public static JavaDStream<String> textFileStreamIncludingExisting(JavaStreamingContext context, String path) { return context.fileStream(path, LongWritable .class, Text.class, TextInputFormat.class, v1 -> true, false).map(v1 -> v1._2.toString()); } @Test public void testTextFileStreamIncludingExistingReadsOldFiles() throws Exception { final Path testDir = Files.createTempDirectory("sparkTest"); final ArrayList<Path> tempFiles = new ArrayList(); // create 20 "old" files final int testFileNumberLimit = 20; for (int testFileNumber = 0; testFileNumber < testFileNumberLimit; testFileNumber++) { final Path testFile = Files.createTempFile(testDir, "testFile", ""); tempFiles.add(testFile); final FileWriter fileWriter = new FileWriter(testFile.toFile()); fileWriter.write("asdf"); fileWriter.flush(); fileWriter.close(); for (String eachAttribute : new String[]{"basic:lastAccessTime", "basic:lastModifiedTime", "basic:creationTime"}) { // set file dates 0 to 20 days ago Files.setAttribute(testFile, eachAttribute, FileTime.from(Instant.now().minus(Duration.ofDays (testFileNumber)))); } } final SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("test"); sparkConf.set("spark.streaming.minRememberDuration", String.valueOf(Integer.MAX_VALUE)); final JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(1)); final JavaDStream<String> input = SparkUtil.textFileStreamIncludingExisting(context, String.valueOf(testDir .toUri())); // count files read final Accumulator<Integer> accumulator = context.sparkContext().accumulator(0); // setup async wait Semaphore done = new Semaphore(1); done.acquire(); input.foreachRDD(new Function<JavaRDD<String>, Void>() { @Override public Void call(JavaRDD<String> v1) throws Exception { if (v1.count() == 0) { done.release(); } accumulator.add((int) v1.count()); return null; } }); context.start(); // wait for completion or 20 sec done.tryAcquire(20, TimeUnit.SECONDS); context.stop(); assertThat(accumulator.value(), is(testFileNumberLimit)); for (Path eachTempFile : tempFiles) { Files.deleteIfExists(eachTempFile); } Files.deleteIfExists(testDir); } From: Tathagata Das [mailto:t...@databricks.com] Sent: Wednesday, July 15, 2015 00:01 To: Terry Hole Cc: Hunter Morgan; user@spark.apache.org Subject: Re: fileStream with old files It was added, but its not documented publicly. I am planning to change the name of the conf to spark.streaming.fileStream.minRememberDuration to make it easier to understand On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hujie.ea...@gmail.com<mailto:hujie.ea...@gmail.com>> wrote: A new configuration named spark.streaming.minRememberDuration was added since 1.2.1 to control the file stream input, the default value is 60 seconds, you can change this value to a large value to include older files (older than 1 minute) You can get the detail from this jira: https://issues.apache.org/jira/browse/SPARK-3276 -Terry On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <hunter.mor...@rackspace.com<mailto:hunter.mor...@rackspace.com>> wrote: It's not as odd as it sounds. I want to ensure that long streaming job outages can recover all the files that went into a directory while the job was down. I've looked at http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039 and http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435 and https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e , but all seem unhelpful. I've tested combinations of the following: * fileStreams created with dumb accept-all filters * newFilesOnly true and false, * tweaking minRememberDuration to high and low values, * on hdfs or local directory. The problem is that it will not read files in the directory from more than a minute ago. JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir, LongWritable.class, Text.class, TextInputFormat.class, v -> true, false); Also tried with having set: context.sparkContext().getConf().set("spark.streaming.minRememberDuration", "1654564"); to big/small. Are there known limitations of the onlyNewFiles=false? Am I doing something wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>