Thanks a lot for the help.

I was able to apply the Tuple1 functionality to fix my problem. I also
moved up to Flink 0.9.

However I have another problem executing generated Scala programs. It
seems like a Scala program executed with a Flink 0.9 Job Manager only has
a limited amount of usable operators. I use the Flink quickstart package
to generate executable .jar files (using mvn clean package). The following
is a simple example program generated by my compiler from a rewritten AQL
query of TPCH query Q6. Whenever I pack it into a .jar file and try to
execute it using a local job manager, I get a "Class not found"-error,
however when I remove any of the operators it works just fine. I also ran
the example within eclipse using the old Flink 0.8 quickstart package.
Interestingly it worked fine there, too, no matter how many operators I
used. Does the Scala environment in Flink 0.9 indeed only have a limited
amount of usable operators? Is this a configuration issue and it is
possible to increase that number?

This is the Query I ran:

import org.apache.flink.api.scala._
import org.apache.flink.api.java.aggregation

object Job {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val $l =
env.readCsvFile[(Int,Int,Int,Int,Double,Double,Double,Double,String,String,String,String,String,String,String,String)]("/home/mcs1408/TPCH_data/lineitem.tbl",
"\n", "|")
    val val0 = $l.filter( x => x._11 >= "1994-01-01")
    val val1 = val0.filter( x => x._11 < "1995-01-01")
    val val2 = val1.filter( x => x._7 >= -0.01)
    val val3 = val2.filter( x => x._7 < 0.01)
    val val4 = val3.filter( x => x._5 < 24)
    val val5 = val4.map{ x => (x._1, x._2, x._3, x._4, x._5, x._8, x._9,
x._10, x._11, x._12, x._13, x._14, x._15, x._16, x._6 * (1 - x._7)) }
        .sum(14)
    val val6 = val5.map{ x => Tuple1(x._15) }
        .writeAsCsv("/home/mcs1408/TPCH_data/result", "\n", "|")

    env.execute("Flink Scala API parsed AQL Query")
  }
}

Thanks a lot for any help!
Best regards,
Max Schultze



> If you're using Scala, then you're bound to a maximum of 22 fields in a
> tuple, because the Scala library does not provide larger tuples. You could
> generate your own case classes which have more than the 22 fields, though.
> On Oct 14, 2015 11:30 AM, "Ufuk Celebi" <u...@apache.org> wrote:
>
>>
>> > On 13 Oct 2015, at 16:06, schul...@informatik.hu-berlin.de wrote:
>> >
>> > Hello,
>> >
>> > I am currently working on a compilation unit translating AsterixDB's
>> AQL
>> > into runnable Scala code for Flink's Scala API. During code generation
>> I
>> > discovered some things that are quite hard to work around. I am still
>> > working with Flink version 0.8, so some of the problems I have might
>> > already be fixed in 0.9 and if so please tell me.
>> >
>> > First, whenever a record gets projected down to only a single field
>> (e.g.
>> > by a map or reduce function) it is no longer considered a record, but
>> a
>> > variable of the type of that field. If afterwards I want to apply
>> > additional functions like .sum(0) I get an error message like
>>
>> A workaround is to return Tuple1<X> for this. Then you can run the
>> aggregation. I think that the Tuple0 class has been added after 0.8
>> though.
>>
>> > "Aggregating on field positions is only possible on tuple data types."
>> >
>> > This is the same for all functions (like write or join) as the
>> "record"
>> is
>> > no longer considered a dataset.
>>
>> What do you mean? At least in the current versions, the join projections
>> return a Tuple type as well.
>>
>> > Second, I found that records longer than 22 fields are not supported.
>> > Whenever I have a record that is longer than that I receive a build
>> error
>> > as
>>
>> Flink’s Tuple classes go up to Tuple25. You can work around this by
>> using
>> a custom PoJo type, e.g.
>>
>> class TPCHRecord {
>>     public int f0;
>>     ...
>>     public int f99;
>> }
>>
>> If possible, I would suggest to update to the latest 0.9 or the upcoming
>> 0.10 release. A lot of stuff has been fixed since 0.8. I think it will
>> be
>> worth it. If you encounter any problems while doing this, feel free to
>> ask
>> here. :)
>>
>> – Ufuk
>


Reply via email to