Thanks a lot for the help. I was able to apply the Tuple1 functionality to fix my problem. I also moved up to Flink 0.9.
However I have another problem executing generated Scala programs. It seems like a Scala program executed with a Flink 0.9 Job Manager only has a limited amount of usable operators. I use the Flink quickstart package to generate executable .jar files (using mvn clean package). The following is a simple example program generated by my compiler from a rewritten AQL query of TPCH query Q6. Whenever I pack it into a .jar file and try to execute it using a local job manager, I get a "Class not found"-error, however when I remove any of the operators it works just fine. I also ran the example within eclipse using the old Flink 0.8 quickstart package. Interestingly it worked fine there, too, no matter how many operators I used. Does the Scala environment in Flink 0.9 indeed only have a limited amount of usable operators? Is this a configuration issue and it is possible to increase that number? This is the Query I ran: import org.apache.flink.api.scala._ import org.apache.flink.api.java.aggregation object Job { def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val $l = env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl", "\n", "|") val val0 = $l.filter( x => x._11 >= "1994-01-01") val val1 = val0.filter( x => x._11 < "1995-01-01") val val2 = val1.filter( x => x._7 >= -0.01) val val3 = val2.filter( x => x._7 < 0.01) val val4 = val3.filter( x => x._5 < 24) val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9, x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) } .sum(14) val val6 = val5.map{ x => Tuple1(x._15) } .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|") env.execute("Flink Scala API parsed AQL Query") } } Thanks a lot for any help! Best regards, Max Schultze > If you're using Scala, then you're bound to a maximum of 22 fields in a > tuple, because the Scala library does not provide larger tuples. You could > generate your own case classes which have more than the 22 fields, though. > On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <u...@apache.org> wrote: > >> >> > On 13 Oct 2015, at 16:06, schul...@informatik.hu-berlin.de wrote: >> > >> > Hello, >> > >> > I am currently working on a compilation unit translating AsterixDB's >> AQL >> > into runnable Scala code for Flink's Scala API. During code generation >> I >> > discovered some things that are quite hard to work around. I am still >> > working with Flink version 0.8, so some of the problems I have might >> > already be fixed in 0.9 and if so please tell me. >> > >> > First, whenever a record gets projected down to only a single field >> (e.g. >> > by a map or reduce function) it is no longer considered a record, but >> a >> > variable of the type of that field. If afterwards I want to apply >> > additional functions like .sum(0) I get an error message like >> >> A workaround is to return Tuple1<X> for this. Then you can run the >> aggregation. I think that the Tuple0 class has been added after 0.8 >> though. >> >> > "Aggregating on field positions is only possible on tuple data types." >> > >> > This is the same for all functions (like write or join) as the >> "record" >> is >> > no longer considered a dataset. >> >> What do you mean? At least in the current versions, the join projections >> return a Tuple type as well. >> >> > Second, I found that records longer than 22 fields are not supported. >> > Whenever I have a record that is longer than that I receive a build >> error >> > as >> >> Flinkâs Tuple classes go up to Tuple25. You can work around this by >> using >> a custom PoJo type, e.g. >> >> class TPCHRecord { >> public int f0; >> ... >> public int f99; >> } >> >> If possible, I would suggest to update to the latest 0.9 or the upcoming >> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will >> be >> worth it. If you encounter any problems while doing this, feel free to >> ask >> here. :) >> >> â Ufuk >