Hi,
thanks once again for those pointers. I did a bit of experimenting the past
couple of days and came to the following conclusions:
1. Unfortunately, I don't think I can get away with option 1 (generating
POJOs on runtime). At least not without generating lots of boiler plate
code, because I'd like to be able to access fields, also inside UDFs, by
index or name. Also instantiating generated classes in UDFs seems like a
pain.
2. Table API looks nice, but doesn't seem to solve the problem of getting
the data into a DataSet in the first place. And I'll likely have to revert
to the DataSet API to do low level operations, such as a simple map(), as
far as I can tell.

However, it seems like the table api already provides exactly Fabian's
option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if
I'm not mistaken, am I?
I played around a bit, trying to use a DataSet[Row] and it seems to work
great. I had to add a hack to make the ScalaCsvInputFormat read Rows
instead of Tuples, as well as add a few convenience methods to the Row
class and a Schema API to quickly generate RowTypeInfos. I pushed my
experiments here:
https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df
A toy example of how I'd like to use the APIs is included in
the TableExperiments.scala file.

Is this a maintainable solution, or is the Row api supposed to be internal
only? If it's supposed to be used outside of the Table api, do you think we
can add the Row and Schema convenience layer like I started to implement to
the core flink-table? Would make it much easier to work with with the
regular DataSet API.

Thanks,
Johann

On 30 October 2015 at 03:34, Stephan Ewen <se...@apache.org> wrote:

> Hi Johann!
>
> You can try and use the Table API, it has logical tuples that you program
> with, rather than tuple classes.
>
> Have a look here:
> https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html
>
> Stephan
>
>
> On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Johann,
>>
>> I see three options for your use case.
>>
>> 1) Generate Pojo code at planning time, i.e., when the program is
>> composed. This does not work when the program is already running. The
>> benefit is that you can use key expressions, have typed fields, and type
>> specific serializers and comparators.
>>
>> 2) Use Record, an Object[], or List<Object> (or some own holder with a
>> few convenience methods) to store the data untyped and work with
>> KeySelectors to extract grouping and join keys. The drawbacks of this
>> approach are generic serializers (Kryo) which won't as efficient as the
>> native ones. If you know the key types and don't use generic types for
>> keys, sorting and joining should be still fast.
>>
>> 3) A hybrid approach of both, which works without code generation. Use a
>> generic holder, e.g., Object[] for your data records but implement you own
>> type information, serializers and comparators. After each operation, you
>> can define the type information of the result using the returns() method,
>> e.g.;
>> myData.map(new MapFunction<Object[],
>> Object[]>).returns(myCustomTypeInfo). This approach requires a good
>> understanding of Flink's type system, but if done correctly, you can also
>> use expressions or positions to define keys and benefit from efficient
>> serialization and binary comparisons. However, similar to the first
>> approach, you need to know the schema of the data in advance (before the
>> program is executed).
>>
>> In my opinion the first approach is the better, but as you said it is
>> more effort to implement and might not work depending on what information
>> is available at which point in time.
>>
>> Let me know if you have any questions.
>>
>> Cheers, Fabian
>>
>> 2015-10-28 20:01 GMT+01:00 Johann Kovacs <m...@jkovacs.de>:
>>
>>> Hi all,
>>>
>>> I currently find myself evaluating a use case, where I have to deal
>>> with wide (i.e. about 50-60 columns, definitely more than the 25
>>> supported by the Tuple types), structured data from CSV files, with a
>>> potentially dynamically (during runtime) generated (or automatically
>>> inferred from the CSV file) schema.
>>> SparkSQL works very well for this case, because I can generate or
>>> infer the schema dynamically at runtime, access fields in UDFs via
>>> index or name (via the Row API), generate new schemata for UDF results
>>> on the fly, and use those schemata to read and write from/to CSV.
>>> Obviously Spark and SparkSQL have other quirks and I'd like to find a
>>> good solution to do this with Flink.
>>>
>>> The main limitation seems to be that I can't seem to have DataSets of
>>> arbitrary-length, arbitrary-type (i.e. unknown during compile time),
>>> tuples. The Record API/type looks like it was meant to provide
>>> something like that but it appears to become deprecated and is not
>>> well supported by the DataSet APIs (e.g. I can't do a join on Records
>>> by field index, nor does the CsvReader API support Records), and it
>>> has no concept of field names, either.
>>>
>>> I though about generating Java classes of my schemata on runtime (e.g.
>>> via Javassist), but that seems like a hack, and I'd probably have to
>>> do this for each intermediate schema as well (e.g. when a map
>>> operation alters the schema). I haven't tried this avenue yet, so I'm
>>> not certain it would actually work, and even less certain that this is
>>> a nice and maintainable solution
>>>
>>> Can anyone suggest a nice way to deal with this kind of use case? I
>>> can prepare an example if that would make it more clear.
>>>
>>> Thanks,
>>> Johann
>>>
>>
>>
>

Reply via email to