I was able to reproduce the error with some more queries by now. However it seems like it is only a problem for Flink's local mode. During cluster execution everything works just fine.
Regards, Max > Thanks a lot for the help. > > I was able to apply the Tuple1 functionality to fix my problem. I also > moved up to Flink 0.9. > > However I have another problem executing generated Scala programs. It > seems like a Scala program executed with a Flink 0.9 Job Manager only has > a limited amount of usable operators. I use the Flink quickstart package > to generate executable .jar files (using mvn clean package). The following > is a simple example program generated by my compiler from a rewritten AQL > query of TPCH query Q6. Whenever I pack it into a .jar file and try to > execute it using a local job manager, I get a "Class not found"-error, > however when I remove any of the operators it works just fine. I also ran > the example within eclipse using the old Flink 0.8 quickstart package. > Interestingly it worked fine there, too, no matter how many operators I > used. Does the Scala environment in Flink 0.9 indeed only have a limited > amount of usable operators? Is this a configuration issue and it is > possible to increase that number? > > This is the Query I ran: > > import org.apache.flink.api.scala._ > import org.apache.flink.api.java.aggregation > > object Job { > def main(args: Array[String]) { > val env = ExecutionEnvironment.getExecutionEnvironment > > val $l = > env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl", > "\n", "|") > val val0 = $l.filter( x => x._11 >= "1994-01-01") > val val1 = val0.filter( x => x._11 < "1995-01-01") > val val2 = val1.filter( x => x._7 >= -0.01) > val val3 = val2.filter( x => x._7 < 0.01) > val val4 = val3.filter( x => x._5 < 24) > val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9, > x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) } > .sum(14) > val val6 = val5.map{ x => Tuple1(x._15) } > .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|") > > env.execute("Flink Scala API parsed AQL Query") > } > } > > Thanks a lot for any help! > Best regards, > Max Schultze > > > >> If you're using Scala, then you're bound to a maximum of 22 fields in a >> tuple, because the Scala library does not provide larger tuples. You >> could >> generate your own case classes which have more than the 22 fields, >> though. >> On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <u...@apache.org> wrote: >> >>> >>> > On 13 Oct 2015, at 16:06, schul...@informatik.hu-berlin.de wrote: >>> > >>> > Hello, >>> > >>> > I am currently working on a compilation unit translating AsterixDB's >>> AQL >>> > into runnable Scala code for Flink's Scala API. During code >>> generation >>> I >>> > discovered some things that are quite hard to work around. I am still >>> > working with Flink version 0.8, so some of the problems I have might >>> > already be fixed in 0.9 and if so please tell me. >>> > >>> > First, whenever a record gets projected down to only a single field >>> (e.g. >>> > by a map or reduce function) it is no longer considered a record, but >>> a >>> > variable of the type of that field. If afterwards I want to apply >>> > additional functions like .sum(0) I get an error message like >>> >>> A workaround is to return Tuple1<X> for this. Then you can run the >>> aggregation. I think that the Tuple0 class has been added after 0.8 >>> though. >>> >>> > "Aggregating on field positions is only possible on tuple data >>> types." >>> > >>> > This is the same for all functions (like write or join) as the >>> "record" >>> is >>> > no longer considered a dataset. >>> >>> What do you mean? At least in the current versions, the join >>> projections >>> return a Tuple type as well. >>> >>> > Second, I found that records longer than 22 fields are not supported. >>> > Whenever I have a record that is longer than that I receive a build >>> error >>> > as >>> >>> Flinkâs Tuple classes go up to Tuple25. You can work around this by >>> using >>> a custom PoJo type, e.g. >>> >>> class TPCHRecord { >>> public int f0; >>> ... >>> public int f99; >>> } >>> >>> If possible, I would suggest to update to the latest 0.9 or the >>> upcoming >>> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will >>> be >>> worth it. If you encounter any problems while doing this, feel free to >>> ask >>> here. :) >>> >>> â Ufuk >> > > >