Hi Johann!

You can try and use the Table API, it has logical tuples that you program
with, rather than tuple classes.

Have a look here:
https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html

Stephan


On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Johann,
>
> I see three options for your use case.
>
> 1) Generate Pojo code at planning time, i.e., when the program is
> composed. This does not work when the program is already running. The
> benefit is that you can use key expressions, have typed fields, and type
> specific serializers and comparators.
>
> 2) Use Record, an Object[], or List<Object> (or some own holder with a few
> convenience methods) to store the data untyped and work with KeySelectors
> to extract grouping and join keys. The drawbacks of this approach are
> generic serializers (Kryo) which won't as efficient as the native ones. If
> you know the key types and don't use generic types for keys, sorting and
> joining should be still fast.
>
> 3) A hybrid approach of both, which works without code generation. Use a
> generic holder, e.g., Object[] for your data records but implement you own
> type information, serializers and comparators. After each operation, you
> can define the type information of the result using the returns() method,
> e.g.;
> myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo).
> This approach requires a good understanding of Flink's type system, but if
> done correctly, you can also use expressions or positions to define keys
> and benefit from efficient serialization and binary comparisons. However,
> similar to the first approach, you need to know the schema of the data in
> advance (before the program is executed).
>
> In my opinion the first approach is the better, but as you said it is more
> effort to implement and might not work depending on what information is
> available at which point in time.
>
> Let me know if you have any questions.
>
> Cheers, Fabian
>
> 2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>:
>
>> Hi all,
>>
>> I currently find myself evaluating a use case, where I have to deal
>> with wide (i.e. about 50-60 columns, definitely more than the 25
>> supported by the Tuple types), structured data from CSV files, with a
>> potentially dynamically (during runtime) generated (or automatically
>> inferred from the CSV file) schema.
>> SparkSQL works very well for this case, because I can generate or
>> infer the schema dynamically at runtime, access fields in UDFs via
>> index or name (via the Row API), generate new schemata for UDF results
>> on the fly, and use those schemata to read and write from/to CSV.
>> Obviously Spark and SparkSQL have other quirks and I'd like to find a
>> good solution to do this with Flink.
>>
>> The main limitation seems to be that I can't seem to have DataSets of
>> arbitrary-length, arbitrary-type (i.e. unknown during compile time),
>> tuples. The Record API/type looks like it was meant to provide
>> something like that but it appears to become deprecated and is not
>> well supported by the DataSet APIs (e.g. I can't do a join on Records
>> by field index, nor does the CsvReader API support Records), and it
>> has no concept of field names, either.
>>
>> I though about generating Java classes of my schemata on runtime (e.g.
>> via Javassist), but that seems like a hack, and I'd probably have to
>> do this for each intermediate schema as well (e.g. when a map
>> operation alters the schema). I haven't tried this avenue yet, so I'm
>> not certain it would actually work, and even less certain that this is
>> a nice and maintainable solution
>>
>> Can anyone suggest a nice way to deal with this kind of use case? I
>> can prepare an example if that would make it more clear.
>>
>> Thanks,
>> Johann
>>
>
>

Reply via email to