Hi shahar,

An easier way to solve your problem is to use a Row to store your data
instead of the `TaggedEvent `. I think this is what Fabian means. In this
way, you don't have to define the user-defined TypeFactory and use the Row
type directly. Take `TaggedEvent<Car>` as an example, the corresponding row
type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
Types.OBJECT_ARRAY(Types.STRING))` in which Types is
`org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
cooperate with Table API & SQL.

However, if the `TaggedEvent` is a must-have for you, you can take a look
at the MapView[1] as an example of how to define a user-defined table
factory.

Best, Hequn

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala

On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Hi Fabian,
>
> I have a trouble implementing the type for this operation, i wonder how i
> can do that.
> So given generic type T i want to create a TypeInformation for:
> class TaggedEvent<T> {
>    String[] tags
>    T originalEvent
> }
>
> Was trying a few different things but not sure how to do it.
> Doesn't seem like i can use TypeHint as i need to know the actual generics
> class for it, right?
> Do i need a TaggedEventTypeFactory? If so, how do i create the
> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
> follow this[1] but doesn't seem to really work. I'm getting null as my
> genericParameter for some reason. Also, how would you create the serializer
> for the type info? can i reuse some builtin Kryo functionality?
>
> Thanks
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>
>
>
>
>
> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>> I'm looking into a way to enrich it without having to know the internal
>> fields of the original event type.
>> Right now what I managed to do is to map Car into a TaggedEvent<Car>
>> prior to the SQL query, tags being empty, then run the SQL query selecting 
>> *origin,
>> enrich(.. ) as tags*
>> Not sure there's a better way but i guess that works
>>
>>
>>
>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> you can use the value construction function ROW to create a nested row
>>> (or object).
>>> However, you have to explicitly reference all attributes that you will
>>> add.
>>>
>>> If you have a table Cars with (year, modelName) a query could look like
>>> this:
>>>
>>> SELECT
>>>   ROW(year, modelName) AS car,
>>>   enrich(year, modelName) AS tags
>>> FROM Cars;
>>>
>>> Handling many attributes is always a bit painful in SQL.
>>> There is an effort to make the Table API easier to use for these use
>>> cases (for example Column Operations [1]).
>>>
>>> Best, Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>
>>>
>>>
>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>> shahar.kobrin...@gmail.com>:
>>>
>>>> Just to be more clear on my goal -
>>>> Im trying to enrich the incoming stream with some meaningful tags based
>>>> on
>>>> conditions from the event itself.
>>>> So the input stream could be an event looks like:
>>>> Class Car {
>>>>   int year;
>>>>   String modelName;
>>>> }
>>>>
>>>> i will have a config that are defining tags as:
>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>>
>>>> So ideally my output will be in the structure of
>>>>
>>>> Class TaggedEvent<Car> {
>>>>    Car origin;
>>>>    String[] tags;
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to