Hi Johann,

I see three options for your use case.

1) Generate Pojo code at planning time, i.e., when the program is composed.
This does not work when the program is already running. The benefit is that
you can use key expressions, have typed fields, and type specific
serializers and comparators.

2) Use Record, an Object[], or List<Object> (or some own holder with a few
convenience methods) to store the data untyped and work with KeySelectors
to extract grouping and join keys. The drawbacks of this approach are
generic serializers (Kryo) which won't as efficient as the native ones. If
you know the key types and don't use generic types for keys, sorting and
joining should be still fast.

3) A hybrid approach of both, which works without code generation. Use a
generic holder, e.g., Object[] for your data records but implement you own
type information, serializers and comparators. After each operation, you
can define the type information of the result using the returns() method,
e.g.;
myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo).
This approach requires a good understanding of Flink's type system, but if
done correctly, you can also use expressions or positions to define keys
and benefit from efficient serialization and binary comparisons. However,
similar to the first approach, you need to know the schema of the data in
advance (before the program is executed).

In my opinion the first approach is the better, but as you said it is more
effort to implement and might not work depending on what information is
available at which point in time.

Let me know if you have any questions.

Cheers, Fabian

2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>:

> Hi all,
>
> I currently find myself evaluating a use case, where I have to deal
> with wide (i.e. about 50-60 columns, definitely more than the 25
> supported by the Tuple types), structured data from CSV files, with a
> potentially dynamically (during runtime) generated (or automatically
> inferred from the CSV file) schema.
> SparkSQL works very well for this case, because I can generate or
> infer the schema dynamically at runtime, access fields in UDFs via
> index or name (via the Row API), generate new schemata for UDF results
> on the fly, and use those schemata to read and write from/to CSV.
> Obviously Spark and SparkSQL have other quirks and I'd like to find a
> good solution to do this with Flink.
>
> The main limitation seems to be that I can't seem to have DataSets of
> arbitrary-length, arbitrary-type (i.e. unknown during compile time),
> tuples. The Record API/type looks like it was meant to provide
> something like that but it appears to become deprecated and is not
> well supported by the DataSet APIs (e.g. I can't do a join on Records
> by field index, nor does the CsvReader API support Records), and it
> has no concept of field names, either.
>
> I though about generating Java classes of my schemata on runtime (e.g.
> via Javassist), but that seems like a hack, and I'd probably have to
> do this for each intermediate schema as well (e.g. when a map
> operation alters the schema). I haven't tried this avenue yet, so I'm
> not certain it would actually work, and even less certain that this is
> a nice and maintainable solution
>
> Can anyone suggest a nice way to deal with this kind of use case? I
> can prepare an example if that would make it more clear.
>
> Thanks,
> Johann
>

Reply via email to