Hi Johann! You can try and use the Table API, it has logical tuples that you program with, rather than tuple classes.
Have a look here: https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html Stephan On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Johann, > > I see three options for your use case. > > 1) Generate Pojo code at planning time, i.e., when the program is > composed. This does not work when the program is already running. The > benefit is that you can use key expressions, have typed fields, and type > specific serializers and comparators. > > 2) Use Record, an Object[], or List<Object> (or some own holder with a few > convenience methods) to store the data untyped and work with KeySelectors > to extract grouping and join keys. The drawbacks of this approach are > generic serializers (Kryo) which won't as efficient as the native ones. If > you know the key types and don't use generic types for keys, sorting and > joining should be still fast. > > 3) A hybrid approach of both, which works without code generation. Use a > generic holder, e.g., Object[] for your data records but implement you own > type information, serializers and comparators. After each operation, you > can define the type information of the result using the returns() method, > e.g.; > myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo). > This approach requires a good understanding of Flink's type system, but if > done correctly, you can also use expressions or positions to define keys > and benefit from efficient serialization and binary comparisons. However, > similar to the first approach, you need to know the schema of the data in > advance (before the program is executed). > > In my opinion the first approach is the better, but as you said it is more > effort to implement and might not work depending on what information is > available at which point in time. > > Let me know if you have any questions. > > Cheers, Fabian > > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>: > >> Hi all, >> >> I currently find myself evaluating a use case, where I have to deal >> with wide (i.e. about 50-60 columns, definitely more than the 25 >> supported by the Tuple types), structured data from CSV files, with a >> potentially dynamically (during runtime) generated (or automatically >> inferred from the CSV file) schema. >> SparkSQL works very well for this case, because I can generate or >> infer the schema dynamically at runtime, access fields in UDFs via >> index or name (via the Row API), generate new schemata for UDF results >> on the fly, and use those schemata to read and write from/to CSV. >> Obviously Spark and SparkSQL have other quirks and I'd like to find a >> good solution to do this with Flink. >> >> The main limitation seems to be that I can't seem to have DataSets of >> arbitrary-length, arbitrary-type (i.e. unknown during compile time), >> tuples. The Record API/type looks like it was meant to provide >> something like that but it appears to become deprecated and is not >> well supported by the DataSet APIs (e.g. I can't do a join on Records >> by field index, nor does the CsvReader API support Records), and it >> has no concept of field names, either. >> >> I though about generating Java classes of my schemata on runtime (e.g. >> via Javassist), but that seems like a hack, and I'd probably have to >> do this for each intermediate schema as well (e.g. when a map >> operation alters the schema). I haven't tried this avenue yet, so I'm >> not certain it would actually work, and even less certain that this is >> a nice and maintainable solution >> >> Can anyone suggest a nice way to deal with this kind of use case? I >> can prepare an example if that would make it more clear. >> >> Thanks, >> Johann >> > >