Hi All, Continuing on this discussion... Is there a good reason why the def of "saveAsNewAPIHadoopFiles" in org/apache/spark/streaming/api/java/JavaPairDStream.scala is defined like this -
def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } As pointed out earlier due to type erasure on the Java side we have to add this code to keep the compiler quite @SuppressWarnings("unchecked") Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; Its works fine, but adds a layer of confusion and inconsistency when compared to its counterpart from the regular RDD.... saveAsNewAPIHadoopFile as defined in spark / core / src / main / scala / org / apache / spark / api / java / JavaPairRDD.scala /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration) { rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } So, is it possible to change the code for "saveAsNewAPIHadoopFiles" in org/apache/spark/streaming/api/java/JavaPairDStream.scala to the following - def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } Less confusion, more readability and better consistency... -abe On Mon, Oct 6, 2014 at 1:51 PM, Abraham Jacob <abe.jac...@gmail.com> wrote: > Sean, > > Thanks a ton Sean... This is exactly what I was looking for. > > As mentioned in the code - > > // This horrible, separate declaration is necessary to appease the > compiler > @SuppressWarnings("unchecked") > Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? > extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; > writableDStream.saveAsNewAPIHadoopFiles(dataDirString + "/oryx", > "data", keyWritableClass, messageWritableClass, outputFormatClass, > streamingContext.sparkContext().hadoopConfiguration()); > > I was just having a hard time with the OutputFormatClass parameter. The > scala code in org/apache/spark/streaming/api/java/JavaPairDStream.scala > defines saveAsNewAPIHadoopFiles as the following - > > /** > * Save each RDD in `this` DStream as a Hadoop file. The file name at > each batch interval is > * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". > */ > def saveAsNewAPIHadoopFiles( > prefix: String, > suffix: String, > keyClass: Class[_], > valueClass: Class[_], > outputFormatClass: Class[_ <: NewOutputFormat[_, _]], > conf: Configuration = new Configuration) { > dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, > valueClass, outputFormatClass, conf) > } > > The problem is that Class[_ <: NewOutputFormat[_, _]] in scala cannot be > assigned as the following (say you are using TextOutputFormat & Text as > KeyClass and IntWritable as ValueClass) TextOuputFormat<Text, > IntWritable>.class in java due to 'type erasure". The parameterized types > lose there type arguments when they are translated to byte code during > compilation. > > > Looks like adding this works - > > @SuppressWarnings("unchecked") > Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends > OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class; > > > Thanks again Sean... > > > On Mon, Oct 6, 2014 at 12:23 PM, Sean Owen <so...@cloudera.com> wrote: > >> Here's an example: >> >> >> https://github.com/OryxProject/oryx/blob/master/oryx-lambda/src/main/java/com/cloudera/oryx/lambda/BatchLayer.java#L131 >> >> On Mon, Oct 6, 2014 at 7:39 PM, Abraham Jacob <abe.jac...@gmail.com> >> wrote: >> > Hi All, >> > >> > Would really appreciate from the community if anyone has implemented the >> > saveAsNewAPIHadoopFiles method in "Java" found in the >> > org.apache.spark.streaming.api.java.JavaPairDStream<K,V> >> > >> > Any code snippet or online link would be greatly appreciated. >> > >> > Regards, >> > Jacob >> > >> > >> > > > > -- > ~ > -- ~