Hi All,

Continuing on this discussion... Is there a good reason why the def of
"saveAsNewAPIHadoopFiles" in
org/apache/spark/streaming/api/java/JavaPairDStream.scala
is defined like this -

def saveAsNewAPIHadoopFiles(
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = new Configuration) {
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
outputFormatClass, conf)
  }


As pointed out earlier due to type erasure on the Java side we have to add
this code to keep the compiler quite

@SuppressWarnings("unchecked")
Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends
OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;

Its works fine, but adds a layer of confusion and inconsistency when
compared to its counterpart from the regular RDD....


saveAsNewAPIHadoopFile as defined in spark / core / src / main / scala /
org / apache / spark / api / java / JavaPairRDD.scala

  /** Output the RDD to any Hadoop-supported file system. */
  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[F],
      conf: Configuration) {
    rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass,
outputFormatClass, conf)
  }



So, is it possible to change the code for "saveAsNewAPIHadoopFiles" in
org/apache/spark/streaming/api/java/JavaPairDStream.scala to the following
-

def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[F],
      conf: Configuration = new Configuration) {
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
outputFormatClass, conf)
  }


Less confusion, more readability and better consistency...

-abe


On Mon, Oct 6, 2014 at 1:51 PM, Abraham Jacob <abe.jac...@gmail.com> wrote:

> Sean,
>
> Thanks a ton Sean... This is exactly what I was looking for.
>
> As mentioned in the code -
>
>     // This horrible, separate declaration is necessary to appease the
> compiler
>     @SuppressWarnings("unchecked")
>     Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<?
> extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;
>     writableDStream.saveAsNewAPIHadoopFiles(dataDirString + "/oryx",
> "data", keyWritableClass, messageWritableClass, outputFormatClass,
> streamingContext.sparkContext().hadoopConfiguration());
>
> I was just having a hard time with the OutputFormatClass parameter.  The
> scala code in org/apache/spark/streaming/api/java/JavaPairDStream.scala
> defines saveAsNewAPIHadoopFiles as the following -
>
>   /**
>    * Save each RDD in `this` DStream as a Hadoop file. The file name at
> each batch interval is
>    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
>    */
>   def saveAsNewAPIHadoopFiles(
>       prefix: String,
>       suffix: String,
>       keyClass: Class[_],
>       valueClass: Class[_],
>       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
>       conf: Configuration = new Configuration) {
>       dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass,
> valueClass, outputFormatClass, conf)
>   }
>
> The problem is that Class[_ <: NewOutputFormat[_, _]] in scala cannot be
> assigned as the following (say you are using TextOutputFormat & Text as
> KeyClass and IntWritable as ValueClass)  TextOuputFormat<Text,
> IntWritable>.class in java due to 'type erasure". The parameterized types
> lose there type arguments when they are translated to byte code during
> compilation.
>
>
> Looks like adding this works -
>
> @SuppressWarnings("unchecked")
>  Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends
> OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;
>
>
> Thanks again Sean...
>
>
> On Mon, Oct 6, 2014 at 12:23 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> Here's an example:
>>
>>
>> https://github.com/OryxProject/oryx/blob/master/oryx-lambda/src/main/java/com/cloudera/oryx/lambda/BatchLayer.java#L131
>>
>> On Mon, Oct 6, 2014 at 7:39 PM, Abraham Jacob <abe.jac...@gmail.com>
>> wrote:
>> > Hi All,
>> >
>> > Would really appreciate from the community if anyone has implemented the
>> > saveAsNewAPIHadoopFiles method in "Java" found in the
>> > org.apache.spark.streaming.api.java.JavaPairDStream<K,V>
>> >
>> > Any code snippet or online link would be greatly appreciated.
>> >
>> > Regards,
>> > Jacob
>> >
>> >
>>
>
>
>
> --
> ~
>



-- 
~

Reply via email to