Here's a working version that we have.
> DStream<Tuple2<Text, Tuple>> hadoopDStream = > streamingContext.fileStream("/akhld/lookhere/", new Function<Path, > Object>(){ > @Override > public Object call(Path path) throws Exception { > // TODO Auto-generated method stub > return !path.getName().startsWith("."); > } }, true, SparkUtil.getManifest(Text.class), > SparkUtil.getManifest(Tuple.class), > SparkUtil.getManifest(PigInputFormat.class)); Thanks Best Regards On Tue, Sep 23, 2014 at 9:47 AM, Michael Quinlan <mq0...@gmail.com> wrote: > I'm attempting to code a Java only implementation accessing the > StreamingContext.fileStream method and am especially interested in setting > the boolean "newFilesOnly" to false. Unfortunately my code throws > exceptions: > > Exception in thread "main" java.lang.InstantiationException > at > > sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at java.lang.Class.newInstance(Class.java:374) > at > org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > > whenever the files are opened. The exceptions are generated whether or not > I > invoke the longer form of the fileStream method. I can use the > JavaStreamingContext version successfully, but don't have access to the > boolean flag in this case. If someone sees an issue with the code below, I > would be very grateful for a nudge in the right direction. > > SparkConf conf = new SparkConf(); > conf.setMaster("local[2]"); > conf.setAppName("SparkStreamingFileTest"); > conf.set("spark.cores.max", "1"); > conf.set("spark.executor.memory","1g"); > > List<String> inputjarslist = new ArrayList<String>(); > > inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar"); > > > inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar"); > > inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar"); > > //Seq<String> inputjars = asScalaBuffer(inputjarslist); > conf.setJars(inputjarslist.toArray(new String[3])); > > StreamingContext scc = new StreamingContext(conf, new > Duration(10000)); > > Seq<String> thejars = scc.sc().jars(); > scala.collection.Iterator iter = thejars.iterator(); > if(!(iter.hasNext())) System.out.println("no jars > associated!!"); > while (iter.hasNext()) { > System.out.println("Jar in system: "+iter.next()); > } > > Function1<Path,Object> f = new > AbstractFunction1<Path,Object>() { > public Boolean apply(Path input){ > return true; > } > }; > > > //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class); > > ClassTag <LongWritable> k = > scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class); > ClassTag <Text> v > =scala.reflect.ClassTag$.MODULE$.apply(Text.class); > ClassTag <InputFormat<LongWritable,Text>> t = > scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class); > > InputDStream<Tuple2<LongWritable,Text>> ans = > scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t); > //InputDStream<Tuple2<LongWritable,Text>> ans = > scc.fileStream("/home/usr/testDataDirectory",k,v,t); > > ans.print(); > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >