Andrew Palumbo created FLINK-3605: ------------------------------------- Summary: DataSet.output(HadoopOutput) always expects a LongWritable as a key and throws an exception when supplied with other Writable Key Types Key: FLINK-3605 URL: https://issues.apache.org/jira/browse/FLINK-3605 Project: Flink Issue Type: Bug Components: DataSet API, Scala API Affects Versions: 1.0.0 Reporter: Andrew Palumbo
When trying to output a {{DataSet}} as a Sequence file, using the Scala API, an Exception it thrown when anything other than a {{LongWritable}} is supplied as a key. The following simple unit test demonstrates this: {code} test("Simple DataSet with IntWritable Key and Value") { val path = TmpDir + "flinkOutput" implicit val typeInfo = createTypeInformation[(Int,Int)] val ds = env.fromElements[(Int, Int)]((1,2), (3,4), (5,6), (7,8)) val writableDataset : DataSet[(IntWritable, IntWritable)] = ds.map( tuple => (new IntWritable(tuple._1.asInstanceOf[Int]), new IntWritable(tuple._2.asInstanceOf[Int])) ) val job: Job = new Job() // setup sink for IntWritable val sequenceFormat = new SequenceFileOutputFormat[IntWritable, IntWritable] val hadoopOutput = new HadoopOutputFormat[IntWritable, IntWritable](sequenceFormat, job) FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) writableDataset.output(hadoopOutput) env.execute(s"dfsWrite($path)") } {code} The above code will throw the following exception: {code} {...} 1 [DataSink (org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat@4d518b32) (1/1)] ERROR org.apache.flink.runtime.operators.DataSinkTask - Error in user code: wrong key class: org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.LongWritable: DataSink (org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat@4d518b32) (1/1) java.io.IOException: wrong key class: org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.LongWritable at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1305) at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:83) at org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat.writeRecord(HadoopOutputFormat.scala:30) at org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat.writeRecord(HadoopOutputFormat.scala:26) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:200) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) {code} When trying to write a {{DataSet}} to a Hadoop Sequence File, the API seems to always expect a Key of type {{LongWritable}} and a Value of Type {{Text}}. Similar exceptions are thrown when other {{Writables}} are used as keys. -- This message was sent by Atlassian JIRA (v6.3.4#6332)